You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:14:49 UTC

[flink] branch executors-clean created (now a3624fc)

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git.


      at a3624fc  [TRAVIS] Removed notifications

This branch includes the following new commits:

     new b5380fa  [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.
     new b26d305  [hotfix] Ignore empty yarn DYNAMIC_PROPERTIES when creating configuration
     new 2b1fd1f  [hotfix] Make the DefaultExecutorServiceLoader a singleton
     new 719bf99  [hotfix] Annotate all the Executor-related interfaces as @Internal
     new 30e3090  [hotfix] Annotate all ClusterClientFactories as @Internal
     new 7722b70  [hotfix] Make ClusterClientFactory.isCompatibleWith case-insensitive
     new f960c8b  [hotfix] Simplify ContextEnvironment construction to use configuration
     new fb18252  [hotfix] CliFrontend.run() merges configurations into one
     new 98e93d5  [FLINK-XXXXX] Update the Executor interface and introduce the JobClient
     new 03cb7ef  [hotfix] Rename configuration param in constructor of (Stream)ExecutionEnv
     new fc18273  [FLINK-XXXXX] Add the Yarn/Standalone Executors
     new 051b9ed  [FLINK-XXXXX] Refactoring the ContextEnvironments
     new 044645b  [FLINK-XXXXX] Fix completeness tests
     new e156e39  [FLINK-XXXXX] Fix ClusterClientFactryDiscovery
     new 4adba90  [FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method
     new a3624fc  [TRAVIS] Removed notifications

The 16 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[flink] 05/16: [hotfix] Annotate all ClusterClientFactories as @Internal

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 30e3090c6c51dacd1762db16ad3c80c8d442ffa8
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:27:05 2019 +0100

    [hotfix] Annotate all ClusterClientFactories as @Internal
---
 .../java/org/apache/flink/client/deployment/ClusterClientFactory.java   | 2 ++
 .../org/apache/flink/client/deployment/StandaloneClientFactory.java     | 2 ++
 .../src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java   | 2 ++
 3 files changed, 6 insertions(+)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
index 36647b6..3c0c5cc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nullable;
@@ -25,6 +26,7 @@ import javax.annotation.Nullable;
 /**
  * A factory containing all the necessary information for creating clients to Flink clusters.
  */
+@Internal
 public interface ClusterClientFactory<ClusterID> {
 
 	/**
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index b441a63..f71563e 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -28,6 +29,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A {@link ClusterClientFactory} for a standalone cluster, i.e. Flink on bare-metal.
  */
+@Internal
 public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
 
 	public static final String ID = "default";
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index aa138a7..156790f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.configuration.Configuration;
@@ -38,6 +39,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A {@link ClusterClientFactory} for a YARN cluster.
  */
+@Internal
 public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
 
 	public static final String ID = "yarn-cluster";


[flink] 08/16: [hotfix] CliFrontend.run() merges configurations into one

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fb182525e8962faa483b678d061046062f64924d
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 15:48:35 2019 +0100

    [hotfix] CliFrontend.run() merges configurations into one
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 41 ++++++++++++++--------
 .../flink/client/cli/ExecutionConfigAccessor.java  |  5 +--
 2 files changed, 29 insertions(+), 17 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 38243fc..d82b377 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -208,33 +208,44 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
-		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
-
 		final List<URL> jobJars = program.getJobJarAndDependencies();
-		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(programOptions, jobJars);
-		final Configuration executionConfig = executionParameters.getConfiguration();
+		final Configuration effectiveConfiguration =
+				getEffectiveConfiguration(commandLine, programOptions, jobJars);
 
 		try {
-			runProgram(executorConfig, executionConfig, program);
+			runProgram(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
 	}
 
+	private Configuration getEffectiveConfiguration(
+			final CommandLine commandLine,
+			final ProgramOptions programOptions,
+			final List<URL> jobJars) throws FlinkException {
+
+		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(checkNotNull(commandLine));
+		final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromProgramOptions(
+				checkNotNull(programOptions),
+				checkNotNull(jobJars));
+
+		final Configuration executorConfig = customCommandLine.applyCommandLineOptionsToConfiguration(commandLine);
+		final Configuration effectiveConfiguration = new Configuration(executorConfig);
+		return executionParameters.applyToConfiguration(effectiveConfiguration);
+	}
+
 	private <ClusterID> void runProgram(
-			Configuration executorConfig,
-			Configuration executionConfig,
+			Configuration configuration,
 			PackagedProgram program) throws ProgramInvocationException, FlinkException {
 
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executorConfig);
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
 		checkNotNull(clusterClientFactory);
 
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executorConfig);
+		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
 
 		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(executorConfig);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
+			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
 			final ClusterClient<ClusterID> client;
 
 			// directly deploy the job if the cluster is started in job mode and detached
@@ -243,7 +254,7 @@ public class CliFrontend {
 
 				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 				client = clusterDescriptor.deployJobCluster(
 					clusterSpecification,
 					jobGraph,
@@ -264,7 +275,7 @@ public class CliFrontend {
 				} else {
 					// also in job mode we have to deploy a session cluster because the job
 					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executorConfig);
+					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
 					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
 					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
 					// there's a race-condition here if cli is killed before shutdown hook is installed
@@ -279,7 +290,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(executionConfig, program, client);
+					executeProgram(configuration, program, client);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index 9e570e1..f55560b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -77,8 +77,9 @@ public class ExecutionConfigAccessor {
 		return new ExecutionConfigAccessor(configuration);
 	}
 
-	public Configuration getConfiguration() {
-		return configuration;
+	Configuration applyToConfiguration(final Configuration baseConfiguration) {
+		baseConfiguration.addAll(configuration);
+		return baseConfiguration;
 	}
 
 	public List<URL> getJars() {


[flink] 07/16: [hotfix] Simplify ContextEnvironment construction to use configuration

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f960c8b76a16cc00208e05f1cb6235305dbc9d25
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 15:22:19 2019 +0100

    [hotfix] Simplify ContextEnvironment construction to use configuration
---
 .../java/org/apache/flink/client/ClientUtils.java  | 25 ++-------
 .../flink/client/program/ContextEnvironment.java   | 41 ++++++++------
 .../client/program/ContextEnvironmentFactory.java  | 64 +++++++---------------
 .../apache/flink/client/program/ClientTest.java    |  5 ++
 4 files changed, 57 insertions(+), 78 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 5e53bc3..1654dff 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
@@ -30,10 +29,10 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -150,33 +149,21 @@ public enum ClientUtils {
 			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 
-		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
-
-		final List<URL> jobJars = executionConfigAccessor.getJars();
-		final List<URL> classpaths = executionConfigAccessor.getClasspaths();
-		final SavepointRestoreSettings savepointSettings = executionConfigAccessor.getSavepointRestoreSettings();
-		final int parallelism = executionConfigAccessor.getParallelism();
-		final boolean detached = executionConfigAccessor.getDetachedMode();
-
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
 
-			LOG.info("Starting program (detached: {})", detached);
+			LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));
 
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
-				client,
-				jobJars,
-				classpaths,
-				userCodeClassLoader,
-				parallelism,
-				detached,
-				savepointSettings,
-				jobExecutionResult);
+					configuration,
+					client,
+					userCodeClassLoader,
+					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
 
 			try {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 08a02af..9a03271 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -25,6 +25,8 @@ import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
@@ -32,6 +34,8 @@ import java.net.URL;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -54,23 +58,28 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	private boolean alreadyCalled;
 
 	public ContextEnvironment(
-		ClusterClient<?> remoteConnection,
-		List<URL> jarFiles,
-		List<URL> classpaths,
-		ClassLoader userCodeClassLoader,
-		SavepointRestoreSettings savepointSettings,
-		boolean detached,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = remoteConnection;
-		this.jarFilesToAttach = jarFiles;
-		this.classpathsToAttach = classpaths;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.savepointSettings = savepointSettings;
-
-		this.detached = detached;
-		this.alreadyCalled = false;
+			final Configuration configuration,
+			final ClusterClient<?> remoteConnection,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+
+		this.client = checkNotNull(remoteConnection);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
+		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
+				.fromConfiguration(checkNotNull(configuration));
 
-		this.jobExecutionResult = jobExecutionResult;
+		if (accessor.getParallelism() > 0) {
+			setParallelism(accessor.getParallelism());
+		}
+
+		this.jarFilesToAttach = accessor.getJars();
+		this.classpathsToAttach = accessor.getClasspaths();
+		this.savepointSettings = accessor.getSavepointRestoreSettings();
+		this.detached = accessor.getDetachedMode();
+
+		this.alreadyCalled = false;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ff7f15b..ab589f2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -22,12 +22,13 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * The factory that instantiates the environment to be used when running jobs that are
  * submitted through a pre-configured client connection.
@@ -35,64 +36,41 @@ import java.util.concurrent.atomic.AtomicReference;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final ClusterClient<?> client;
-
-	private final List<URL> jarFilesToAttach;
+	private final Configuration configuration;
 
-	private final List<URL> classpathsToAttach;
+	private final ClusterClient<?> client;
 
 	private final ClassLoader userCodeClassLoader;
 
-	private final int defaultParallelism;
-
-	private final boolean isDetached;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
-		ClusterClient<?> client,
-		List<URL> jarFilesToAttach,
-		List<URL> classpathsToAttach,
-		ClassLoader userCodeClassLoader,
-		int defaultParallelism,
-		boolean isDetached,
-		SavepointRestoreSettings savepointSettings,
-		AtomicReference<JobExecutionResult> jobExecutionResult) {
-		this.client = client;
-		this.jarFilesToAttach = jarFilesToAttach;
-		this.classpathsToAttach = classpathsToAttach;
-		this.userCodeClassLoader = userCodeClassLoader;
-		this.defaultParallelism = defaultParallelism;
-		this.isDetached = isDetached;
-		this.savepointSettings = savepointSettings;
+			final Configuration configuration,
+			final ClusterClient<?> client,
+			final ClassLoader userCodeClassLoader,
+			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		this.configuration = checkNotNull(configuration);
+		this.client = checkNotNull(client);
+		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+
 		this.alreadyCalled = false;
-		this.jobExecutionResult = jobExecutionResult;
 	}
 
 	@Override
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
-
-		final ContextEnvironment environment = new ContextEnvironment(
-			client,
-			jarFilesToAttach,
-			classpathsToAttach,
-			userCodeClassLoader,
-			savepointSettings,
-			isDetached,
-			jobExecutionResult);
-		if (defaultParallelism > 0) {
-			environment.setParallelism(defaultParallelism);
-		}
-		return environment;
+		return new ContextEnvironment(
+				configuration,
+				client,
+				userCodeClassLoader,
+				jobExecutionResult);
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
-		if (isDetached && alreadyCalled) {
+		if (!configuration.getBoolean(DeploymentOptions.ATTACHED) && alreadyCalled) {
 			throw new InvalidProgramException("Multiple environments cannot be created in detached mode");
 		}
 		alreadyCalled = true;
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
 		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+		when(packagedProgramMock.getUserCodeClassLoader())
+				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {


[flink] 16/16: [TRAVIS] Removed notifications

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a3624fc63e42f773ead2107cd6e6cf3b2be6e2a5
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 22:02:10 2019 +0100

    [TRAVIS] Removed notifications
---
 .travis.yml | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/.travis.yml b/.travis.yml
index 65cec31..e6ffbb5 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -63,6 +63,9 @@ notifications:
     rooms:
       - secure: ikPQn5JTpkyzxVyOPm/jIl3FPm6hY8xAdG4pSwxGWjBqF+NmmNTp9YZsJ6fD8xPql6T5n1hNDbZSC14jVUw/vvXGvibDXLN+06f25ZQl+4LJBXaiR7gTG6y3nO8G90Vw7XpvCme6n5Md9tvjygb17a4FEgRJFfwzWnnyPA1yvK0=
     on_success: never
+    on_failure: never
+    on_cancel: never
+    on_error: never
     on_pull_requests: false
   webhooks:
     urls:


[flink] 15/16: [FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4adba90f5e627c7b3704f96fafc7ad98f12a3fcd
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 21:50:27 2019 +0100

    [FLINK-XXXXX] Remove redundant CliFrontend.runProgram() method
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 91 +---------------------
 1 file changed, 1 insertion(+), 90 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index e8f8179..e1269a0 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -30,7 +30,6 @@ import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.deployment.ClusterClientFactory;
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.ClusterDescriptor;
-import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
@@ -49,7 +48,6 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.security.SecurityConfiguration;
@@ -57,7 +55,6 @@ import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.ShutdownHookUtil;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
@@ -214,7 +211,7 @@ public class CliFrontend {
 				getEffectiveConfiguration(commandLine, programOptions, jobJars);
 
 		try {
-			runProgram(effectiveConfiguration, program);
+			executeProgram(effectiveConfiguration, program);
 		} finally {
 			program.deleteExtractedLibraries();
 		}
@@ -235,92 +232,6 @@ public class CliFrontend {
 		return executionParameters.applyToConfiguration(effectiveConfiguration);
 	}
 
-	private <ClusterID> void runProgram(
-			Configuration configuration,
-			PackagedProgram program) throws ProgramInvocationException, FlinkException {
-
-		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
-		checkNotNull(clusterClientFactory);
-
-		final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration);
-
-		try {
-			final ClusterID clusterId = clusterClientFactory.getClusterId(configuration);
-			final ExecutionConfigAccessor executionParameters = ExecutionConfigAccessor.fromConfiguration(configuration);
-			final ClusterClient<ClusterID> client;
-
-			// directly deploy the job if the cluster is started in job mode and detached
-			if (clusterId == null && executionParameters.getDetachedMode()) {
-				int parallelism = executionParameters.getParallelism() == -1 ? defaultParallelism : executionParameters.getParallelism();
-
-				final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
-
-				final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-				client = clusterDescriptor.deployJobCluster(
-					clusterSpecification,
-					jobGraph,
-					executionParameters.getDetachedMode());
-
-				logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
-
-				try {
-					client.close();
-				} catch (Exception e) {
-					LOG.info("Could not properly shut down the client.", e);
-				}
-			} else {
-				final Thread shutdownHook;
-				if (clusterId != null) {
-					client = clusterDescriptor.retrieve(clusterId);
-					shutdownHook = null;
-				} else {
-					// also in job mode we have to deploy a session cluster because the job
-					// might consist of multiple parts (e.g. when using collect)
-					final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
-					client = clusterDescriptor.deploySessionCluster(clusterSpecification);
-					// if not running in detached mode, add a shutdown hook to shut down cluster if client exits
-					// there's a race-condition here if cli is killed before shutdown hook is installed
-					if (!executionParameters.getDetachedMode() && executionParameters.isShutdownOnAttachedExit()) {
-						shutdownHook = ShutdownHookUtil.addShutdownHook(client::shutDownCluster, client.getClass().getSimpleName(), LOG);
-					} else {
-						shutdownHook = null;
-					}
-				}
-
-				try {
-					int userParallelism = executionParameters.getParallelism();
-					LOG.debug("User parallelism is set to {}", userParallelism);
-
-					executeProgram(configuration, program);
-				} finally {
-					if (clusterId == null && !executionParameters.getDetachedMode()) {
-						// terminate the cluster only if we have started it before and if it's not detached
-						try {
-							client.shutDownCluster();
-						} catch (final Exception e) {
-							LOG.info("Could not properly terminate the Flink cluster.", e);
-						}
-						if (shutdownHook != null) {
-							// we do not need the hook anymore as we have just tried to shutdown the cluster.
-							ShutdownHookUtil.removeShutdownHook(shutdownHook, client.getClass().getSimpleName(), LOG);
-						}
-					}
-					try {
-						client.close();
-					} catch (Exception e) {
-						LOG.info("Could not properly shut down the client.", e);
-					}
-				}
-			}
-		} finally {
-			try {
-				clusterDescriptor.close();
-			} catch (Exception e) {
-				LOG.info("Could not properly close the cluster descriptor.", e);
-			}
-		}
-	}
-
 	/**
 	 * Executes the info action.
 	 *


[flink] 03/16: [hotfix] Make the DefaultExecutorServiceLoader a singleton

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b1fd1fcc80657215ad325a9191ae5e3e77fffda
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:20:46 2019 +0100

    [hotfix] Make the DefaultExecutorServiceLoader a singleton
---
 .../flink/core/execution/DefaultExecutorServiceLoader.java   | 12 +++++++++---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java |  2 +-
 .../api/environment/StreamExecutionEnvironment.java          |  2 +-
 3 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 241feab..8bde967 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -45,6 +45,8 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	private static final ServiceLoader<ExecutorFactory> defaultLoader = ServiceLoader.load(ExecutorFactory.class);
 
+	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
+
 	@Override
 	public ExecutorFactory getExecutorFactory(final Configuration configuration) {
 		checkNotNull(configuration);
@@ -67,14 +69,18 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 		}
 
 		if (compatibleFactories.size() > 1) {
-			final List<String> configStr =
+			final String configStr =
 					configuration.toMap().entrySet().stream()
 							.map(e -> e.getKey() + "=" + e.getValue())
-							.collect(Collectors.toList());
+							.collect(Collectors.joining("\n"));
 
-			throw new IllegalStateException("Multiple compatible client factories found for:\n" + String.join("\n", configStr) + ".");
+			throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
 		}
 
 		return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
 	}
+
+	private DefaultExecutorServiceLoader() {
+		// make sure nobody instantiates us explicitly.
+	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..26632e6 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -140,7 +140,7 @@ public class ExecutionEnvironment {
 	}
 
 	protected ExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..ba702ea 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -163,7 +163,7 @@ public class StreamExecutionEnvironment {
 	}
 
 	public StreamExecutionEnvironment(final Configuration executorConfiguration) {
-		this(new DefaultExecutorServiceLoader(), executorConfiguration);
+		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
 	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {


[flink] 14/16: [FLINK-XXXXX] Fix ClusterClientFactryDiscovery

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e156e3942e033403ad4b64d97c7289a98e55a081
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 19 06:25:28 2019 +0100

    [FLINK-XXXXX] Fix ClusterClientFactryDiscovery
---
 .../main/java/org/apache/flink/client/cli/DefaultCLI.java |  5 +++--
 .../flink/client/deployment/StandaloneClientFactory.java  |  5 ++---
 .../client/deployment/ClusterClientServiceLoaderTest.java |  3 ++-
 .../org/apache/flink/yarn/YarnClusterClientFactory.java   |  8 +++++---
 .../org/apache/flink/yarn/cli/FlinkYarnSessionCli.java    |  8 +++++---
 .../apache/flink/yarn/YarnClusterClientFactoryTest.java   | 15 +++++++++++++--
 6 files changed, 30 insertions(+), 14 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index 397d5dd..1245688 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.cli;
 
-import org.apache.flink.client.deployment.StandaloneClientFactory;
 import org.apache.flink.configuration.Configuration;
 
 import org.apache.commons.cli.CommandLine;
@@ -29,6 +28,8 @@ import org.apache.commons.cli.Options;
  */
 public class DefaultCLI extends AbstractCustomCommandLine {
 
+	public static final String ID = "default";
+
 	public DefaultCLI(Configuration configuration) {
 		super(configuration);
 	}
@@ -41,7 +42,7 @@ public class DefaultCLI extends AbstractCustomCommandLine {
 
 	@Override
 	public String getId() {
-		return StandaloneClientFactory.ID;
+		return ID;
 	}
 
 	@Override
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index 647f14f..e5ec6f4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.client.deployment;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -32,12 +33,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StandaloneClientFactory implements ClusterClientFactory<StandaloneClusterId> {
 
-	public static final String ID = "default";
-
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+		return StandaloneSessionClusterExecutor.NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a8e34ab..b7a9953 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 
@@ -57,7 +58,7 @@ public class ClusterClientServiceLoaderTest {
 	@Test
 	public void testStandaloneClusterClientFactoryDiscovery() {
 		final Configuration config = new Configuration();
-		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID.toUpperCase());
+		config.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
 
 		ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
 		assertTrue(factory instanceof StandaloneClientFactory);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index fd23699..1b1d9de 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -26,6 +26,8 @@ import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -42,12 +44,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class YarnClusterClientFactory implements ClusterClientFactory<ApplicationId> {
 
-	public static final String ID = "yarn-cluster";
-
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+		final String deploymentTarget = configuration.getString(DeploymentOptions.TARGET);
+		return YarnJobClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget) ||
+				YarnSessionClusterExecutor.NAME.equalsIgnoreCase(deploymentTarget);
 	}
 
 	@Override
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 388dea0..7ef70dc 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -43,7 +43,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.ShutdownHookUtil;
-import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
@@ -105,6 +104,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 	private static final long CLIENT_POLLING_INTERVAL_MS = 3000L;
 
+	/** The id for the CommandLine interface. */
+	private static final String ID = "yarn-cluster";
+
 	// YARN-session related constants
 	private static final String YARN_PROPERTIES_FILE = ".yarn-properties-";
 	private static final String YARN_APPLICATION_ID_KEY = "applicationID";
@@ -319,14 +321,14 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	@Override
 	public boolean isActive(CommandLine commandLine) {
 		String jobManagerOption = commandLine.getOptionValue(addressOption.getOpt(), null);
-		boolean yarnJobManager = YarnClusterClientFactory.ID.equals(jobManagerOption);
+		boolean yarnJobManager = ID.equals(jobManagerOption);
 		boolean yarnAppId = commandLine.hasOption(applicationId.getOpt());
 		return yarnJobManager || yarnAppId || (isYarnPropertiesFileMode(commandLine) && yarnApplicationIdFromYarnProperties != null);
 	}
 
 	@Override
 	public String getId() {
-		return YarnClusterClientFactory.ID;
+		return ID;
 	}
 
 	@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 6bdd920..508c11e 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.junit.Test;
@@ -35,9 +37,18 @@ import static org.junit.Assert.assertTrue;
 public class YarnClusterClientFactoryTest {
 
 	@Test
-	public void testYarnClusterClientFactoryDiscovery() {
+	public void testYarnClusterClientFactoryDiscoveryWithPerJobExecutor() {
+		testYarnClusterClientFactoryDiscoveryHelper(YarnJobClusterExecutor.NAME);
+	}
+
+	@Test
+	public void testYarnClusterClientFactoryDiscoveryWithSessionExecutor() {
+		testYarnClusterClientFactoryDiscoveryHelper(YarnSessionClusterExecutor.NAME);
+	}
+
+	private void testYarnClusterClientFactoryDiscoveryHelper(final String targetName) {
 		final Configuration configuration = new Configuration();
-		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID.toUpperCase());
+		configuration.setString(DeploymentOptions.TARGET, targetName);
 
 		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
 		final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);


[flink] 11/16: [FLINK-XXXXX] Add the Yarn/Standalone Executors

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit fc182735564bf6a6a0fdd687ec260783ceeda8e2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 17:10:15 2019 +0100

    [FLINK-XXXXX] Add the Yarn/Standalone Executors
---
 .../deployment/AbstractJobClusterExecutor.java     | 68 ++++++++++++++++-
 .../deployment/AbstractSessionClusterExecutor.java | 65 ++++++++++++++++-
 .../flink/client/deployment/ExecutorUtils.java     | 55 ++++++++++++++
 .../flink/client/deployment/JobClientImpl.java     | 85 +++++++++++++++++++++-
 .../StandaloneSessionClusterExecutor.java          | 38 ++++++++++
 .../StandaloneSessionClusterExecutorFactory.java   | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 16 ++++
 .../yarn/executors/YarnJobClusterExecutor.java     | 41 +++++++++++
 .../executors/YarnJobClusterExecutorFactory.java   | 45 ++++++++++++
 .../yarn/executors/YarnSessionClusterExecutor.java | 39 ++++++++++
 .../YarnSessionClusterExecutorFactory.java         | 45 ++++++++++++
 ...org.apache.flink.core.execution.ExecutorFactory | 17 +++++
 12 files changed, 556 insertions(+), 3 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
index 14a93bc..bca403a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -1,4 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractJobClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on dedicated (per-job) clusters.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode());
+			LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+			return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobGraph.getJobID()));
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
index ab8cc01..413ff58 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -1,4 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class AbstractSessionClusterExecutor {
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * An abstract {@link Executor} used to execute {@link Pipeline pipelines} on an existing (session) cluster.
+ *
+ * @param <ClusterID> the type of the id of the cluster.
+ * @param <ClientFactory> the type of the {@link ClusterClientFactory} used to create/retrieve a client to the target cluster.
+ */
+@Internal
+public class AbstractSessionClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements Executor {
+
+	private final ClientFactory clusterClientFactory;
+
+	public AbstractSessionClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
+		this.clusterClientFactory = checkNotNull(clusterClientFactory);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
+		final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID);
+			return clusterClient
+					.submitJob(jobGraph)
+					.thenApply(JobSubmissionResult::getJobID)
+					.thenApply(jobID -> new JobClientImpl<>(clusterClient, jobID));
+		}
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
index e134206..4541b1c 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -1,4 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import javax.annotation.Nonnull;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility class with method related to job execution.
+ */
 public class ExecutorUtils {
+
+	/**
+	 * Creates the {@link JobGraph} corresponding to the provided {@link Pipeline}.
+	 *
+	 * @param pipeline the pipeline whose job graph we are computing
+	 * @param configuration the configuration with the necessary information such as jars and
+	 *                         classpaths to be included, the parallelism of the job and potential
+	 *                         savepoint settings used to bootstrap its state.
+	 * @return the corresponding {@link JobGraph}.
+	 */
+	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(executionConfigAccessor.getJars());
+		jobGraph.setClasspaths(executionConfigAccessor.getClasspaths());
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
index e811fc8..29d8008 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -1,4 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.flink.client.deployment;
 
-public class JobClientImpl {
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An implementation of the {@link JobClient} interface.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+	private final ClusterClient<ClusterID> clusterClient;
+
+	private final JobID jobID;
+
+	public JobClientImpl(final ClusterClient<ClusterID> clusterClient, final JobID jobID) {
+		this.jobID = checkNotNull(jobID);
+		this.clusterClient = checkNotNull(clusterClient);
+	}
+
+	@Override
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+		return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+		final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+		final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+		jobResultFuture.whenComplete(((jobResult, throwable) -> {
+			if (throwable != null) {
+				ExceptionUtils.checkInterrupted(throwable);
+				res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+			} else {
+				try {
+					final JobExecutionResult result = jobResult.toJobExecutionResult(userClassloader);
+					res.complete(result);
+				} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+					res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+				}
+			}
+		}));
+		return res;
+	}
+
+	@Override
+	public void close() throws Exception {
+		this.clusterClient.close();
+	}
 }
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
new file mode 100644
index 0000000..f097323
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.client.deployment.StandaloneClientFactory;
+import org.apache.flink.client.deployment.StandaloneClusterId;
+import org.apache.flink.core.execution.Executor;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutor extends AbstractSessionClusterExecutor<StandaloneClusterId, StandaloneClientFactory> {
+
+	public static final String NAME = "standalone-session-cluster";
+
+	public StandaloneSessionClusterExecutor() {
+		super(new StandaloneClientFactory());
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..43c116e
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/StandaloneSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class StandaloneSessionClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(StandaloneSessionClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new StandaloneSessionClusterExecutor();
+	}
+}
diff --git a/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d9b144f
--- /dev/null
+++ b/flink-clients/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutorFactory
\ No newline at end of file
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
new file mode 100644
index 0000000..084b020
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractJobClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+@Internal
+public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+	public static final String NAME = "yarn-job-cluster";
+
+	public YarnJobClusterExecutor() {
+		super(new YarnClusterClientFactory());
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
new file mode 100644
index 0000000..9dc6fd1
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnJobClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on dedicated (per-job) clusters.
+ */
+@Internal
+public class YarnJobClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(YarnJobClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new YarnJobClusterExecutor();
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
new file mode 100644
index 0000000..873dce4
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutor.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.client.deployment.AbstractSessionClusterExecutor;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.yarn.YarnClusterClientFactory;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutor extends AbstractSessionClusterExecutor<ApplicationId, YarnClusterClientFactory> {
+
+	public static final String NAME = "yarn-session-cluster";
+
+	public YarnSessionClusterExecutor() {
+		super(new YarnClusterClientFactory());
+	}
+}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
new file mode 100644
index 0000000..101a622
--- /dev/null
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/executors/YarnSessionClusterExecutorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.executors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+
+import javax.annotation.Nonnull;
+
+/**
+ * An {@link ExecutorFactory} for executing jobs on an existing (session) cluster.
+ */
+@Internal
+public class YarnSessionClusterExecutorFactory implements ExecutorFactory {
+
+	@Override
+	public boolean isCompatibleWith(@Nonnull final Configuration configuration) {
+		return configuration.get(DeploymentOptions.TARGET)
+				.equalsIgnoreCase(YarnSessionClusterExecutor.NAME);
+	}
+
+	@Override
+	public Executor getExecutor(@Nonnull final Configuration configuration) {
+		return new YarnSessionClusterExecutor();
+	}
+}
diff --git a/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
new file mode 100644
index 0000000..d56f8c5
--- /dev/null
+++ b/flink-yarn/src/main/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -0,0 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.yarn.executors.YarnJobClusterExecutorFactory
+org.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
\ No newline at end of file


[flink] 04/16: [hotfix] Annotate all the Executor-related interfaces as @Internal

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 719bf99de9655c8c47b6e5c6effdad1b8a93ee14
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:22:03 2019 +0100

    [hotfix] Annotate all the Executor-related interfaces as @Internal
---
 .../apache/flink/core/execution/DefaultExecutorServiceLoader.java | 6 +++++-
 .../src/main/java/org/apache/flink/core/execution/Executor.java   | 7 ++++++-
 .../java/org/apache/flink/core/execution/ExecutorFactory.java     | 8 ++++++--
 .../org/apache/flink/core/execution/ExecutorServiceLoader.java    | 6 +++++-
 .../java/org/apache/flink/api/java/ExecutorDiscoveryTest.java     | 6 ++++--
 .../apache/flink/streaming/environment/ExecutorDiscoveryTest.java | 6 ++++--
 6 files changed, 30 insertions(+), 9 deletions(-)

diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 8bde967..a088f8d 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -35,6 +38,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
  */
+@Internal
 public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 
 	// TODO: This code is almost identical to the ClusterClientServiceLoader and its default implementation.
@@ -48,7 +52,7 @@ public class DefaultExecutorServiceLoader implements ExecutorServiceLoader {
 	public static final DefaultExecutorServiceLoader INSTANCE = new DefaultExecutorServiceLoader();
 
 	@Override
-	public ExecutorFactory getExecutorFactory(final Configuration configuration) {
+	public ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) {
 		checkNotNull(configuration);
 
 		final List<ExecutorFactory> compatibleFactories = new ArrayList<>();
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 3476742..7069e70 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -18,20 +18,25 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
+@Internal
 public interface Executor {
 
 	/**
 	 * Executes a {@link Pipeline} based on the provided configuration.
+	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
 	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
 	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
index 8d6687b..9d6860a 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorFactory.java
@@ -18,23 +18,27 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * A factory for selecting and instantiating the adequate {@link Executor}
  * based on a provided {@link Configuration}.
  */
+@Internal
 public interface ExecutorFactory {
 
 	/**
 	 * Returns {@code true} if this factory is compatible with the options in the
 	 * provided configuration, {@code false} otherwise.
 	 */
-	boolean isCompatibleWith(Configuration configuration);
+	boolean isCompatibleWith(@Nonnull final Configuration configuration);
 
 	/**
 	 * Instantiates an {@link Executor} compatible with the provided configuration.
 	 * @return the executor instance.
 	 */
-	Executor getExecutor(Configuration configuration);
+	Executor getExecutor(@Nonnull Configuration configuration);
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
index 0b959eb7..5aee4ee 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/ExecutorServiceLoader.java
@@ -18,12 +18,16 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nonnull;
+
 /**
  * An interface to be implemented by the entity responsible for finding the correct {@link Executor} to
  * execute a given {@link org.apache.flink.api.dag.Pipeline}.
  */
+@Internal
 public interface ExecutorServiceLoader {
 
 	/**
@@ -35,5 +39,5 @@ public interface ExecutorServiceLoader {
 	 * @throws Exception if there is more than one compatible factories, or something went wrong when
 	 * 			loading the registered factories.
 	 */
-	ExecutorFactory getExecutorFactory(Configuration configuration) throws Exception;
+	ExecutorFactory getExecutorFactory(@Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..2d46915 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -29,6 +29,8 @@ import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -69,12 +71,12 @@ public class ExecutorDiscoveryTest {
 		public static final String ID = "test-executor-A";
 
 		@Override
-		public boolean isCompatibleWith(Configuration configuration) {
+		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
 			return ID.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
-		public Executor getExecutor(Configuration configuration) {
+		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..2a1bb4a 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -70,12 +72,12 @@ public class ExecutorDiscoveryTest {
 		public static final String ID = "test-executor-A";
 
 		@Override
-		public boolean isCompatibleWith(Configuration configuration) {
+		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
 			return ID.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
-		public Executor getExecutor(Configuration configuration) {
+		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));


[flink] 06/16: [hotfix] Make ClusterClientFactory.isCompatibleWith case-insensitive

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7722b70041a9e2392e2592db5db9e7ab9ef2fa65
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 14:30:45 2019 +0100

    [hotfix] Make ClusterClientFactory.isCompatibleWith case-insensitive
---
 .../org/apache/flink/client/deployment/StandaloneClientFactory.java     | 2 +-
 .../apache/flink/client/deployment/ClusterClientServiceLoaderTest.java  | 2 +-
 .../src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java   | 2 +-
 .../test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java   | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
index f71563e..647f14f 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClientFactory.java
@@ -37,7 +37,7 @@ public class StandaloneClientFactory implements ClusterClientFactory<StandaloneC
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
index a084021..a8e34ab 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/ClusterClientServiceLoaderTest.java
@@ -57,7 +57,7 @@ public class ClusterClientServiceLoaderTest {
 	@Test
 	public void testStandaloneClusterClientFactoryDiscovery() {
 		final Configuration config = new Configuration();
-		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID);
+		config.setString(DeploymentOptions.TARGET, StandaloneClientFactory.ID.toUpperCase());
 
 		ClusterClientFactory<StandaloneClusterId> factory = serviceLoaderUnderTest.getClusterClientFactory(config);
 		assertTrue(factory instanceof StandaloneClientFactory);
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
index 156790f..fd23699 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterClientFactory.java
@@ -47,7 +47,7 @@ public class YarnClusterClientFactory implements ClusterClientFactory<Applicatio
 	@Override
 	public boolean isCompatibleWith(Configuration configuration) {
 		checkNotNull(configuration);
-		return ID.equals(configuration.getString(DeploymentOptions.TARGET));
+		return ID.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
 	}
 
 	@Override
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
index 931313a..6bdd920 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnClusterClientFactoryTest.java
@@ -37,7 +37,7 @@ public class YarnClusterClientFactoryTest {
 	@Test
 	public void testYarnClusterClientFactoryDiscovery() {
 		final Configuration configuration = new Configuration();
-		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID);
+		configuration.setString(DeploymentOptions.TARGET, YarnClusterClientFactory.ID.toUpperCase());
 
 		final ClusterClientServiceLoader serviceLoader = new DefaultClusterClientServiceLoader();
 		final ClusterClientFactory<ApplicationId> factory = serviceLoader.getClusterClientFactory(configuration);


[flink] 10/16: [hotfix] Rename configuration param in constructor of (Stream)ExecutionEnv

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 03cb7ef4c919f6f5496689bba6edb6feb69d1a3e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 16:40:52 2019 +0100

    [hotfix] Rename configuration param in constructor of (Stream)ExecutionEnv
---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java   |  8 ++++----
 .../streaming/api/environment/StreamExecutionEnvironment.java  | 10 ++++------
 2 files changed, 8 insertions(+), 10 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index d485105..6e64788 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -142,13 +142,13 @@ public class ExecutionEnvironment {
 		this(new Configuration());
 	}
 
-	protected ExecutionEnvironment(final Configuration executorConfiguration) {
-		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
+	protected ExecutionEnvironment(final Configuration configuration) {
+		this(DefaultExecutorServiceLoader.INSTANCE, configuration);
 	}
 
-	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration configuration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
-		this.configuration = checkNotNull(executorConfiguration);
+		this.configuration = checkNotNull(configuration);
 		this.userClassloader = getClass().getClassLoader();
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 176a7ab..b8544cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -165,15 +165,13 @@ public class StreamExecutionEnvironment {
 		this(new Configuration());
 	}
 
-	public StreamExecutionEnvironment(final Configuration executorConfiguration) {
-		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
+	public StreamExecutionEnvironment(final Configuration configuration) {
+		this(DefaultExecutorServiceLoader.INSTANCE, configuration);
 	}
 
-	public StreamExecutionEnvironment(
-			final ExecutorServiceLoader executorServiceLoader,
-			final Configuration executorConfiguration) {
+	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration configuration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
-		this.configuration = checkNotNull(executorConfiguration);
+		this.configuration = checkNotNull(configuration);
 		this.userClassloader = getClass().getClassLoader();
 	}
 


[flink] 01/16: [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5380faec18ee53a1fe14bf71c531292199baf41
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Thu Nov 14 15:41:57 2019 +0100

    [hotfix] Fix parallelism consolidation logic in the (Stream)ExecutionEnvironment.
---
 .../java/org/apache/flink/api/java/ExecutionEnvironment.java   | 10 ++--------
 .../streaming/api/environment/StreamExecutionEnvironment.java  | 10 ++--------
 2 files changed, 4 insertions(+), 16 deletions(-)

diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1c6fca1..5b07843 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -801,15 +801,9 @@ public class ExecutionEnvironment {
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
-		final int execParallelism = getParallelism();
-		if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-			return;
+		if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+			configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
 		}
-
-		// if parallelism is set in the ExecutorConfig, then
-		// that value takes precedence over any other value.
-
-		configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 721869c..3870b52 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -1556,15 +1556,9 @@ public class StreamExecutionEnvironment {
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
-		final int execParallelism = getParallelism();
-		if (execParallelism == ExecutionConfig.PARALLELISM_DEFAULT) {
-			return;
+		if (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT) {
+			configuration.getOptional(CoreOptions.DEFAULT_PARALLELISM).ifPresent(this::setParallelism);
 		}
-
-		// if parallelism is set in the ExecutorConfig, then
-		// that value takes precedence over any other value.
-
-		configuration.set(CoreOptions.DEFAULT_PARALLELISM, execParallelism);
 	}
 
 	/**


[flink] 09/16: [FLINK-XXXXX] Update the Executor interface and introduce the JobClient

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 98e93d522dba8218b44024290862e5424fc35b39
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 16:31:04 2019 +0100

    [FLINK-XXXXX] Update the Executor interface and introduce the JobClient
---
 .../deployment/AbstractJobClusterExecutor.java     |  4 ++
 .../deployment/AbstractSessionClusterExecutor.java |  4 ++
 .../flink/client/deployment/ExecutorUtils.java     |  4 ++
 .../flink/client/deployment/JobClientImpl.java     |  4 ++
 .../org/apache/flink/core/execution/Executor.java  | 13 +++--
 .../execution/{Executor.java => JobClient.java}    | 31 ++++++++----
 .../flink/api/java/ExecutionEnvironment.java       | 19 ++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 .../environment/StreamExecutionEnvironment.java    | 20 +++++++-
 ...java => ExecutorDiscoveryAndJobClientTest.java} | 59 ++++++++++++++++++----
 ...org.apache.flink.core.execution.ExecutorFactory |  2 +-
 12 files changed, 180 insertions(+), 41 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
new file mode 100644
index 0000000..14a93bc
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractJobClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractJobClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
new file mode 100644
index 0000000..ab8cc01
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/AbstractSessionClusterExecutor.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class AbstractSessionClusterExecutor {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
new file mode 100644
index 0000000..e134206
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ExecutorUtils.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class ExecutorUtils {
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
new file mode 100644
index 0000000..e811fc8
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/JobClientImpl.java
@@ -0,0 +1,4 @@
+package org.apache.flink.client.deployment;
+
+public class JobClientImpl {
+}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 7069e70..1d606e8 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -19,12 +19,13 @@
 package org.apache.flink.core.execution;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -32,11 +33,15 @@ import javax.annotation.Nonnull;
 public interface Executor {
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Executes a {@link Pipeline} based on the provided configuration and returns a {@link JobClient} which allows to
+	 * interact with the job being executed, e.g. cancel it or take a savepoint.
+	 *
+	 * <p><b>ATTENTION:</b> The caller is responsible for managing the lifecycle of the returned {@link JobClient}. This
+	 * means that e.g. {@code close()} should be called explicitly at the call-site.
 	 *
 	 * @param pipeline the {@link Pipeline} to execute
 	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @return a {@link CompletableFuture} with the {@link JobClient} corresponding to the pipeline.
 	 */
-	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 53%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 7069e70..06310bd 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,25 +18,34 @@
 
 package org.apache.flink.core.execution;
 
-import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+import org.apache.flink.api.common.JobID;
 
 import javax.annotation.Nonnull;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-@Internal
-public interface Executor {
+@PublicEvolving
+public interface JobClient extends AutoCloseable {
+
+	/**
+	 * Returns the {@link JobID} that uniquely identifies the job this client is scoped to.
+	 */
+	JobID getJobID();
+
+	/**
+	 * Returns the result of the job submission which will also contain the job id of the submitted job.
+	 */
+	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
 	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
+	 * Returns the {@link JobExecutionResult result of the job execution} of the submitted job.
 	 *
-	 * @param pipeline the {@link Pipeline} to execute
-	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
+	 * @param userClassloader the classloader used to de-serialize the accumulators of the job.
 	 */
-	JobExecutionResult execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception;
+	CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 26632e6..d485105 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -796,8 +804,15 @@ public class ExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		lastJobExecutionResult = executor.execute(plan, configuration);
-		return lastJobExecutionResult;
+
+		try (final JobClient jobClient = executor.execute(plan, configuration).get()) {
+
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+
+			return lastJobExecutionResult;
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
rename to flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
index 2d46915..c674c7e 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryAndJobClientTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
@@ -34,25 +35,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link ExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+	private static final String EXEC_NAME = "test-executor";
+	private static final long ATTACHED_RUNTIME = 42L;
+	private static final long DETACHED_RUNTIME = 11L;
+
+	@Test
+	public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+		testHelper(true, ATTACHED_RUNTIME);
+	}
 
 	@Test
-	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+	public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+		testHelper(false, DETACHED_RUNTIME);
+	}
+
+	private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
 		final Configuration configuration = new Configuration();
-		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+		configuration.set(DeploymentOptions.ATTACHED, attached);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
 		final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-		assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+		assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+		final long runtime = result.getNetRuntime();
+		assertThat(runtime, is(equalTo(expectedRuntime)));
 	}
 
 	private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -68,19 +87,39 @@ public class ExecutorDiscoveryTest {
 	 */
 	public static class IDReportingExecutorFactory implements ExecutorFactory {
 
-		public static final String ID = "test-executor-A";
-
 		@Override
 		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
-			return ID.equals(configuration.get(DeploymentOptions.TARGET));
+			return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
 		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
-				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public JobID getJobID() {
+						return new JobID();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+					}
+
+					@Override
+					public void close() {
+
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index fcfaa55..c09254a 100644
--- a/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.api.java.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.api.java.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ba702ea..176a7ab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
 		this(DefaultExecutorServiceLoader.INSTANCE, executorConfiguration);
 	}
 
-	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+	public StreamExecutionEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -1552,7 +1562,13 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		return executor.execute(streamGraph, configuration);
+
+		try (final JobClient jobClient = executor.execute(streamGraph, configuration).get()) {
+
+			return configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: jobClient.getJobSubmissionResult().get();
+		}
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
similarity index 60%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
rename to flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
index 2a1bb4a..7caf531 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryAndJobClientTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
@@ -35,25 +36,43 @@ import javax.annotation.Nonnull;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 import static org.hamcrest.core.IsEqual.equalTo;
 
 /**
- * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment}.
+ * Tests the {@link ExecutorFactory} discovery in the {@link StreamExecutionEnvironment} and the calls of the {@link JobClient}.
  */
-public class ExecutorDiscoveryTest {
+public class ExecutorDiscoveryAndJobClientTest {
+
+	private static final String EXEC_NAME = "test-executor";
+	private static final long ATTACHED_RUNTIME = 42L;
+	private static final long DETACHED_RUNTIME = 11L;
+
+	@Test
+	public void jobClientGetJobExecutionResultShouldBeCalledOnAttachedExecution() throws Exception {
+		testHelper(true, ATTACHED_RUNTIME);
+	}
 
 	@Test
-	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
+	public void jobClientGetJobExecutionResultShouldBeCalledOnDetachedExecution() throws Exception {
+		testHelper(false, DETACHED_RUNTIME);
+	}
+
+	private void testHelper(final boolean attached, final long expectedRuntime) throws Exception {
 		final Configuration configuration = new Configuration();
-		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.TARGET, EXEC_NAME);
+		configuration.set(DeploymentOptions.ATTACHED, attached);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
 		final String executorName = result.getAllAccumulatorResults().get(DeploymentOptions.TARGET.key()).toString();
-		assertThat(executorName, is(equalTo(IDReportingExecutorFactory.ID)));
+		assertThat(executorName, is(equalTo(EXEC_NAME)));
+
+		final long runtime = result.getNetRuntime();
+		assertThat(runtime, is(equalTo(expectedRuntime)));
 	}
 
 	private JobExecutionResult executeTestJobBasedOnConfig(final Configuration configuration) throws Exception {
@@ -69,19 +88,39 @@ public class ExecutorDiscoveryTest {
 	 */
 	public static class IDReportingExecutorFactory implements ExecutorFactory {
 
-		public static final String ID = "test-executor-A";
-
 		@Override
 		public boolean isCompatibleWith(@Nonnull Configuration configuration) {
-			return ID.equals(configuration.get(DeploymentOptions.TARGET));
+			return EXEC_NAME.equals(configuration.get(DeploymentOptions.TARGET));
 		}
 
 		@Override
 		public Executor getExecutor(@Nonnull Configuration configuration) {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
-				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(EXEC_NAME));
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public JobID getJobID() {
+						return new JobID();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), DETACHED_RUNTIME, res));
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), ATTACHED_RUNTIME, res));
+					}
+
+					@Override
+					public void close() {
+
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
index 68ddbcb..a5186ae 100644
--- a/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
+++ b/flink-streaming-java/src/test/resources/META-INF/services/org.apache.flink.core.execution.ExecutorFactory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.flink.streaming.environment.ExecutorDiscoveryTest$IDReportingExecutorFactory
\ No newline at end of file
+org.apache.flink.streaming.environment.ExecutorDiscoveryAndJobClientTest$IDReportingExecutorFactory
\ No newline at end of file


[flink] 13/16: [FLINK-XXXXX] Fix completeness tests

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 044645b401f738f8ca345cc968a55d2a1940c3a2
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 19 05:22:51 2019 +0100

    [FLINK-XXXXX] Fix completeness tests
---
 .../pyflink/dataset/tests/test_execution_environment_completeness.py  | 3 ++-
 .../flink/streaming/api/environment/StreamExecutionEnvironment.java   | 4 ----
 .../flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala  | 3 +++
 3 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
index a86db7b..7277e4e 100644
--- a/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
+++ b/flink-python/pyflink/dataset/tests/test_execution_environment_completeness.py
@@ -49,7 +49,8 @@ class ExecutionEnvironmentCompletenessTests(PythonAPICompletenessTestCase,
                 'getIdString', 'setSessionTimeout', 'fromElements', 'createRemoteEnvironment',
                 'startNewSession', 'fromCollection', 'readTextFileWithValue', 'registerDataSink',
                 'createCollectionsEnvironment', 'readFile', 'readFileOfPrimitives',
-                'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput'}
+                'generateSequence', 'areExplicitEnvironmentsAllowed', 'createInput',
+                'getUserCodeClassLoader', 'getExecutorServiceLoader', 'getConfiguration'}
 
 
 if __name__ == '__main__':
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index ea50d04..b8544cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -179,10 +179,6 @@ public class StreamExecutionEnvironment {
 		this.userClassloader = checkNotNull(userClassloader);
 	}
 
-	public ClassLoader getUserCodeClassLoader() {
-		return userClassloader;
-	}
-
 	protected Configuration getConfiguration() {
 		return this.configuration;
 	}
diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
index 6dc2ac5..e0666cf 100644
--- a/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
+++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/completeness/BatchScalaAPICompletenessTest.scala
@@ -55,6 +55,9 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
        "org.apache.flink.api.java.operators.TwoInputOperator.getInput2Type",
        "org.apache.flink.api.java.ExecutionEnvironment.areExplicitEnvironmentsAllowed",
        "org.apache.flink.api.java.ExecutionEnvironment.resetContextEnvironment",
+       "org.apache.flink.api.java.ExecutionEnvironment.getUserCodeClassLoader",
+       "org.apache.flink.api.java.ExecutionEnvironment.getExecutorServiceLoader",
+       "org.apache.flink.api.java.ExecutionEnvironment.getConfiguration",
 
        // TypeHints are only needed for Java API, Scala API doesn't need them
        "org.apache.flink.api.java.operators.SingleInputUdfOperator.returns",


[flink] 02/16: [hotfix] Ignore empty yarn DYNAMIC_PROPERTIES when creating configuration

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b26d305adb9a09ab23d40cae6f862a0651fc3cf1
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 16:50:18 2019 +0100

    [hotfix] Ignore empty yarn DYNAMIC_PROPERTIES when creating configuration
---
 .../src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java  | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 3fb8dfa..362deda 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -416,7 +416,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 		}
 
 		final String dynamicPropertiesEncoded = encodeDynamicProperties(commandLine);
-		configuration.setString(YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		if (dynamicPropertiesEncoded != null && !dynamicPropertiesEncoded.isEmpty()) {
+			configuration.setString(YarnConfigOptionsInternal.DYNAMIC_PROPERTIES, dynamicPropertiesEncoded);
+		}
 
 		final boolean detached = commandLine.hasOption(YARN_DETACHED_OPTION.getOpt()) || commandLine.hasOption(DETACHED_OPTION.getOpt());
 		configuration.setBoolean(DeploymentOptions.ATTACHED, !detached);


[flink] 12/16: [FLINK-XXXXX] Refactoring the ContextEnvironments

Posted by kk...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 051b9edc4b555e47093644ad10ec1c6ec756e14e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 20:58:57 2019 +0100

    [FLINK-XXXXX] Refactoring the ContextEnvironments
---
 .../java/org/apache/flink/client/ClientUtils.java  |  6 +-
 .../client/cli/AbstractCustomCommandLine.java      |  3 +-
 .../org/apache/flink/client/cli/CliFrontend.java   | 10 +--
 .../flink/client/program/ContextEnvironment.java   | 92 ++++------------------
 .../client/program/ContextEnvironmentFactory.java  | 11 +--
 .../flink/client/cli/CliFrontendRunTest.java       |  3 +-
 .../apache/flink/client/program/ClientTest.java    | 67 ++++++++++++++--
 .../flink/api/java/ExecutionEnvironment.java       | 10 ++-
 .../api/environment/StreamContextEnvironment.java  | 36 +++------
 .../environment/StreamExecutionEnvironment.java    |  4 +
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  6 +-
 11 files changed, 122 insertions(+), 126 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 1654dff..f599478 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -145,10 +146,11 @@ public enum ClientUtils {
 	}
 
 	public static JobSubmissionResult executeProgram(
+			ExecutorServiceLoader executorServiceLoader,
 			Configuration configuration,
-			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 
+		checkNotNull(executorServiceLoader);
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -160,8 +162,8 @@ public enum ClientUtils {
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
+					executorServiceLoader,
 					configuration,
-					client,
 					userCodeClassLoader,
 					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		final Configuration resultingConfiguration = new Configuration(configuration);
-		resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
+		resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
 
 		if (commandLine.hasOption(addressOption.getOpt())) {
 			String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index d82b377..e8f8179 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(configuration, program, client);
+					executeProgram(configuration, program);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
@@ -750,12 +751,11 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected void executeProgram(
-			Configuration configuration,
-			PackagedProgram program,
-			ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
+			final Configuration configuration,
+			final PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
+		JobSubmissionResult result = ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9a03271..4561da4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,44 +36,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private final ClusterClient<?> client;
-
-	private final boolean detached;
-
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-
-	private final ClassLoader userCodeClassLoader;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
-	public ContextEnvironment(
+	ContextEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> remoteConnection,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
-
-		this.client = checkNotNull(remoteConnection);
-		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		super(executorServiceLoader, configuration);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 
-		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
-				.fromConfiguration(checkNotNull(configuration));
+		setUserClassloader(checkNotNull(userCodeClassLoader));
 
-		if (accessor.getParallelism() > 0) {
-			setParallelism(accessor.getParallelism());
+		final int parallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		}
 
-		this.jarFilesToAttach = accessor.getJars();
-		this.classpathsToAttach = accessor.getClasspaths();
-		this.savepointSettings = accessor.getSavepointRestoreSettings();
-		this.detached = accessor.getDetachedMode();
-
 		this.alreadyCalled = false;
 	}
 
@@ -86,29 +62,13 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
 
-		Plan plan = createProgramPlan(jobName);
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
-				client.getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(this.jarFilesToAttach);
-		jobGraph.setClasspaths(this.classpathsToAttach);
-
-		if (detached) {
-			lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
-		} else {
-			lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
-		}
-
-		setJobExecutionResult(lastJobExecutionResult);
-
-		return lastJobExecutionResult;
+		final JobExecutionResult jobExecutionResult = super.execute(jobName);
+		setJobExecutionResult(jobExecutionResult);
+		return jobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-		if (alreadyCalled && detached) {
+		if (alreadyCalled && !getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
 			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
 		}
 		alreadyCalled = true;
@@ -123,30 +83,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
 	}
 
-	public ClusterClient<?> getClient() {
-		return this.client;
-	}
-
-	public List<URL> getJars(){
-		return jarFilesToAttach;
-	}
-
-	public List<URL> getClasspaths(){
-		return classpathsToAttach;
-	}
-
-	public ClassLoader getUserCodeClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public SavepointRestoreSettings getSavepointRestoreSettings() {
-		return savepointSettings;
-	}
-
-	public boolean isDetached() {
-		return detached;
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	public static void setAsContext(ContextEnvironmentFactory factory) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ab589f2..1092c53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,9 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final Configuration configuration;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final ClusterClient<?> client;
+	private final Configuration configuration;
 
 	private final ClassLoader userCodeClassLoader;
 
@@ -47,12 +48,12 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> client,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(configuration);
-		this.client = checkNotNull(client);
 		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 
@@ -63,8 +64,8 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
 		return new ContextEnvironment(
+				executorServiceLoader,
 				configuration,
-				client,
 				userCodeClassLoader,
 				jobExecutionResult);
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..f6c536e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+		protected void executeProgram(Configuration configuration, PackagedProgram program) {
 			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
 			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..1e73ac9f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
@@ -30,6 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.JobClientImpl;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
@@ -37,6 +40,9 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -56,9 +62,13 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nonnull;
+
 import java.net.URL;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -80,6 +90,8 @@ public class ClientTest extends TestLogger {
 
 	private Configuration config;
 
+	private static final String TEST_EXECUTOR_NAME = "test_executor";
+
 	private static final String ACCUMULATOR_NAME = "test_accumulator";
 
 	private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
@@ -100,6 +112,7 @@ public class ClientTest extends TestLogger {
 
 	private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) {
 		final Configuration configuration = new Configuration();
+		configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
 		configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
 		configuration.set(DeploymentOptions.ATTACHED, !detached);
 		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
@@ -116,7 +129,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -127,7 +141,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -138,7 +153,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -149,7 +165,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -160,7 +177,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -209,7 +227,7 @@ public class ClientTest extends TestLogger {
 		try {
 			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
 			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
-			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock);
 			fail("Creating the local execution environment should not be possible");
 		}
 		catch (InvalidProgramException e) {
@@ -346,4 +364,41 @@ public class ClientTest extends TestLogger {
 			env.execute().getAllAccumulatorResults();
 		}
 	}
+
+	private static final class TestExecutorServiceLoader implements ExecutorServiceLoader {
+
+		private final ClusterClient<?> clusterClient;
+
+		private final Plan plan;
+
+		TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) {
+			this.clusterClient = checkNotNull(clusterClient);
+			this.plan = checkNotNull(plan);
+		}
+
+		@Override
+		public ExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
+			return new ExecutorFactory() {
+				@Override
+				public boolean isCompatibleWith(@Nonnull Configuration configuration) {
+					return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+				}
+
+				@Override
+				public Executor getExecutor(@Nonnull Configuration configuration) {
+					return (pipeline, config) -> {
+						final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+						final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
+
+						final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
+						jobGraph.addJars(accessor.getJars());
+						jobGraph.setClasspaths(accessor.getClasspaths());
+
+						final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+						return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobID));
+					};
+				}
+			};
+		}
+	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6e64788..673af75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -156,7 +156,15 @@ public class ExecutionEnvironment {
 		this.userClassloader = checkNotNull(userClassloader);
 	}
 
-	protected Configuration getConfiguration() {
+	public ClassLoader getUserCodeClassLoader() {
+		return userClassloader;
+	}
+
+	public ExecutorServiceLoader getExecutorServiceLoader() {
+		return executorServiceLoader;
+	}
+
+	public Configuration getConfiguration() {
 		return this.configuration;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..3cdb483 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -35,37 +34,24 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private final ContextEnvironment ctx;
 
-	protected StreamContextEnvironment(ContextEnvironment ctx) {
+	StreamContextEnvironment(final ContextEnvironment ctx) {
+		super(checkNotNull(ctx).getExecutorServiceLoader(), ctx.getConfiguration());
+
 		this.ctx = ctx;
-		if (ctx.getParallelism() > 0) {
-			setParallelism(ctx.getParallelism());
+
+		final int parallelism = ctx.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		}
+		setUserClassloader(ctx.getUserCodeClassLoader());
 	}
 
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				streamGraph,
-				ctx.getClient().getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(ctx.getJars());
-		jobGraph.setClasspaths(ctx.getClasspaths());
-
-		// running from the CLI will override the savepoint restore settings
-		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-		JobExecutionResult jobExecutionResult;
-		if (ctx.isDetached()) {
-			jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
-		} else {
-			jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
-		}
-
+		final JobExecutionResult jobExecutionResult = super.execute(streamGraph);
 		ctx.setJobExecutionResult(jobExecutionResult);
-
 		return jobExecutionResult;
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b8544cf..ea50d04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -179,6 +179,10 @@ public class StreamExecutionEnvironment {
 		this.userClassloader = checkNotNull(userClassloader);
 	}
 
+	public ClassLoader getUserCodeClassLoader() {
+		return userClassloader;
+	}
+
 	protected Configuration getConfiguration() {
 		return this.configuration;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 362deda..388dea0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
-		effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
 
 		applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
 
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
 			effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
+		} else {
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
 		}
 
 		if (commandLine.hasOption(jmMemory.getOpt())) {