You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/19 06:15:01 UTC

[flink] 12/16: [FLINK-XXXXX] Refactoring the ContextEnvironments

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executors-clean
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 051b9edc4b555e47093644ad10ec1c6ec756e14e
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Mon Nov 18 20:58:57 2019 +0100

    [FLINK-XXXXX] Refactoring the ContextEnvironments
---
 .../java/org/apache/flink/client/ClientUtils.java  |  6 +-
 .../client/cli/AbstractCustomCommandLine.java      |  3 +-
 .../org/apache/flink/client/cli/CliFrontend.java   | 10 +--
 .../flink/client/program/ContextEnvironment.java   | 92 ++++------------------
 .../client/program/ContextEnvironmentFactory.java  | 11 +--
 .../flink/client/cli/CliFrontendRunTest.java       |  3 +-
 .../apache/flink/client/program/ClientTest.java    | 67 ++++++++++++++--
 .../flink/api/java/ExecutionEnvironment.java       | 10 ++-
 .../api/environment/StreamContextEnvironment.java  | 36 +++------
 .../environment/StreamExecutionEnvironment.java    |  4 +
 .../apache/flink/yarn/cli/FlinkYarnSessionCli.java |  6 +-
 11 files changed, 122 insertions(+), 126 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 1654dff..f599478 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,7 @@ import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -145,10 +146,11 @@ public enum ClientUtils {
 	}
 
 	public static JobSubmissionResult executeProgram(
+			ExecutorServiceLoader executorServiceLoader,
 			Configuration configuration,
-			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 
+		checkNotNull(executorServiceLoader);
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
 
 		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
@@ -160,8 +162,8 @@ public enum ClientUtils {
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
+					executorServiceLoader,
 					configuration,
-					client,
 					userCodeClassLoader,
 					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index f32d4f8..b8431cf 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.cli;
 
+import org.apache.flink.client.deployment.executors.StandaloneSessionClusterExecutor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.HighAvailabilityOptions;
@@ -73,7 +74,7 @@ public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 	@Override
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		final Configuration resultingConfiguration = new Configuration(configuration);
-		resultingConfiguration.setString(DeploymentOptions.TARGET, getId());
+		resultingConfiguration.setString(DeploymentOptions.TARGET, StandaloneSessionClusterExecutor.NAME);
 
 		if (commandLine.hasOption(addressOption.getOpt())) {
 			String addressWithPort = commandLine.getOptionValue(addressOption.getOpt());
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index d82b377..e8f8179 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -290,7 +291,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(configuration, program, client);
+					executeProgram(configuration, program);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
@@ -750,12 +751,11 @@ public class CliFrontend {
 	// --------------------------------------------------------------------------------------------
 
 	protected void executeProgram(
-			Configuration configuration,
-			PackagedProgram program,
-			ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
+			final Configuration configuration,
+			final PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
+		JobSubmissionResult result = ClientUtils.executeProgram(DefaultExecutorServiceLoader.INSTANCE, configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9a03271..4561da4 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,44 +36,25 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private final ClusterClient<?> client;
-
-	private final boolean detached;
-
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-
-	private final ClassLoader userCodeClassLoader;
-
-	private final SavepointRestoreSettings savepointSettings;
-
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
-	public ContextEnvironment(
+	ContextEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> remoteConnection,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
-
-		this.client = checkNotNull(remoteConnection);
-		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+		super(executorServiceLoader, configuration);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 
-		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
-				.fromConfiguration(checkNotNull(configuration));
+		setUserClassloader(checkNotNull(userCodeClassLoader));
 
-		if (accessor.getParallelism() > 0) {
-			setParallelism(accessor.getParallelism());
+		final int parallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		}
 
-		this.jarFilesToAttach = accessor.getJars();
-		this.classpathsToAttach = accessor.getClasspaths();
-		this.savepointSettings = accessor.getSavepointRestoreSettings();
-		this.detached = accessor.getDetachedMode();
-
 		this.alreadyCalled = false;
 	}
 
@@ -86,29 +62,13 @@ public class ContextEnvironment extends ExecutionEnvironment {
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
 
-		Plan plan = createProgramPlan(jobName);
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
-				client.getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(this.jarFilesToAttach);
-		jobGraph.setClasspaths(this.classpathsToAttach);
-
-		if (detached) {
-			lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
-		} else {
-			lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
-		}
-
-		setJobExecutionResult(lastJobExecutionResult);
-
-		return lastJobExecutionResult;
+		final JobExecutionResult jobExecutionResult = super.execute(jobName);
+		setJobExecutionResult(jobExecutionResult);
+		return jobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-		if (alreadyCalled && detached) {
+		if (alreadyCalled && !getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
 			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
 		}
 		alreadyCalled = true;
@@ -123,30 +83,6 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
 	}
 
-	public ClusterClient<?> getClient() {
-		return this.client;
-	}
-
-	public List<URL> getJars(){
-		return jarFilesToAttach;
-	}
-
-	public List<URL> getClasspaths(){
-		return classpathsToAttach;
-	}
-
-	public ClassLoader getUserCodeClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public SavepointRestoreSettings getSavepointRestoreSettings() {
-		return savepointSettings;
-	}
-
-	public boolean isDetached() {
-		return detached;
-	}
-
 	// --------------------------------------------------------------------------------------------
 
 	public static void setAsContext(ContextEnvironmentFactory factory) {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index ab589f2..1092c53 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,9 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final Configuration configuration;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final ClusterClient<?> client;
+	private final Configuration configuration;
 
 	private final ClassLoader userCodeClassLoader;
 
@@ -47,12 +48,12 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> client,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(configuration);
-		this.client = checkNotNull(client);
 		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 
@@ -63,8 +64,8 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
 		return new ContextEnvironment(
+				executorServiceLoader,
 				configuration,
-				client,
 				userCodeClassLoader,
 				jobExecutionResult);
 	}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index a0d551b..f6c536e 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -198,7 +197,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase {
 		}
 
 		@Override
-		protected void executeProgram(Configuration configuration, PackagedProgram program, ClusterClient client) {
+		protected void executeProgram(Configuration configuration, PackagedProgram program) {
 			final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 			assertEquals(isDetached, executionConfigAccessor.getDetachedMode());
 			assertEquals(expectedParallelism, executionConfigAccessor.getParallelism());
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..1e73ac9f 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.client.program;
 
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.ProgramDescription;
@@ -30,6 +31,8 @@ import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.client.ClientUtils;
 import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.JobClientImpl;
 import org.apache.flink.configuration.AkkaOptions;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
@@ -37,6 +40,9 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -56,9 +62,13 @@ import org.junit.experimental.categories.Category;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import javax.annotation.Nonnull;
+
 import java.net.URL;
 import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -80,6 +90,8 @@ public class ClientTest extends TestLogger {
 
 	private Configuration config;
 
+	private static final String TEST_EXECUTOR_NAME = "test_executor";
+
 	private static final String ACCUMULATOR_NAME = "test_accumulator";
 
 	private static final String FAIL_MESSAGE = "Invalid program should have thrown ProgramInvocationException.";
@@ -100,6 +112,7 @@ public class ClientTest extends TestLogger {
 
 	private Configuration fromPackagedProgram(final PackagedProgram program, final int parallelism, final boolean detached) {
 		final Configuration configuration = new Configuration();
+		configuration.setString(DeploymentOptions.TARGET, TEST_EXECUTOR_NAME);
 		configuration.set(CoreOptions.DEFAULT_PARALLELISM, parallelism);
 		configuration.set(DeploymentOptions.ATTACHED, !detached);
 		ConfigUtils.encodeCollectionToConfig(configuration, PipelineOptions.CLASSPATHS, program.getClasspaths(), URL::toString);
@@ -116,7 +129,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -127,7 +141,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -138,7 +153,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -149,7 +165,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -160,7 +177,8 @@ public class ClientTest extends TestLogger {
 		try {
 			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
 			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
+
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(clusterClient, plan), configuration, prg);
 			fail(FAIL_MESSAGE);
 		} catch (ProgramInvocationException e) {
 			assertEquals(
@@ -209,7 +227,7 @@ public class ClientTest extends TestLogger {
 		try {
 			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
 			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
-			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
+			ClientUtils.executeProgram(new TestExecutorServiceLoader(client, plan), configuration, packagedProgramMock);
 			fail("Creating the local execution environment should not be possible");
 		}
 		catch (InvalidProgramException e) {
@@ -346,4 +364,41 @@ public class ClientTest extends TestLogger {
 			env.execute().getAllAccumulatorResults();
 		}
 	}
+
+	private static final class TestExecutorServiceLoader implements ExecutorServiceLoader {
+
+		private final ClusterClient<?> clusterClient;
+
+		private final Plan plan;
+
+		TestExecutorServiceLoader(final ClusterClient<?> clusterClient, final Plan plan) {
+			this.clusterClient = checkNotNull(clusterClient);
+			this.plan = checkNotNull(plan);
+		}
+
+		@Override
+		public ExecutorFactory getExecutorFactory(@Nonnull Configuration configuration) {
+			return new ExecutorFactory() {
+				@Override
+				public boolean isCompatibleWith(@Nonnull Configuration configuration) {
+					return TEST_EXECUTOR_NAME.equalsIgnoreCase(configuration.getString(DeploymentOptions.TARGET));
+				}
+
+				@Override
+				public Executor getExecutor(@Nonnull Configuration configuration) {
+					return (pipeline, config) -> {
+						final int parallelism = config.getInteger(CoreOptions.DEFAULT_PARALLELISM);
+						final JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(plan, config, parallelism);
+
+						final ExecutionConfigAccessor accessor = ExecutionConfigAccessor.fromConfiguration(config);
+						jobGraph.addJars(accessor.getJars());
+						jobGraph.setClasspaths(accessor.getClasspaths());
+
+						final JobID jobID = ClientUtils.submitJob(clusterClient, jobGraph).getJobID();
+						return CompletableFuture.completedFuture(new JobClientImpl<>(clusterClient, jobID));
+					};
+				}
+			};
+		}
+	}
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 6e64788..673af75 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -156,7 +156,15 @@ public class ExecutionEnvironment {
 		this.userClassloader = checkNotNull(userClassloader);
 	}
 
-	protected Configuration getConfiguration() {
+	public ClassLoader getUserCodeClassLoader() {
+		return userClassloader;
+	}
+
+	public ExecutorServiceLoader getExecutorServiceLoader() {
+		return executorServiceLoader;
+	}
+
+	public Configuration getConfiguration() {
 		return this.configuration;
 	}
 
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..3cdb483 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -35,37 +34,24 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private final ContextEnvironment ctx;
 
-	protected StreamContextEnvironment(ContextEnvironment ctx) {
+	StreamContextEnvironment(final ContextEnvironment ctx) {
+		super(checkNotNull(ctx).getExecutorServiceLoader(), ctx.getConfiguration());
+
 		this.ctx = ctx;
-		if (ctx.getParallelism() > 0) {
-			setParallelism(ctx.getParallelism());
+
+		final int parallelism = ctx.getParallelism();
+		if (parallelism > 0) {
+			setParallelism(parallelism);
 		}
+		setUserClassloader(ctx.getUserCodeClassLoader());
 	}
 
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
 
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				streamGraph,
-				ctx.getClient().getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(ctx.getJars());
-		jobGraph.setClasspaths(ctx.getClasspaths());
-
-		// running from the CLI will override the savepoint restore settings
-		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-		JobExecutionResult jobExecutionResult;
-		if (ctx.isDetached()) {
-			jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
-		} else {
-			jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
-		}
-
+		final JobExecutionResult jobExecutionResult = super.execute(streamGraph);
 		ctx.setJobExecutionResult(jobExecutionResult);
-
 		return jobExecutionResult;
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index b8544cf..ea50d04 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -179,6 +179,10 @@ public class StreamExecutionEnvironment {
 		this.userClassloader = checkNotNull(userClassloader);
 	}
 
+	public ClassLoader getUserCodeClassLoader() {
+		return userClassloader;
+	}
+
 	protected Configuration getConfiguration() {
 		return this.configuration;
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index 362deda..388dea0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -47,6 +47,8 @@ import org.apache.flink.yarn.YarnClusterClientFactory;
 import org.apache.flink.yarn.YarnClusterDescriptor;
 import org.apache.flink.yarn.configuration.YarnConfigOptions;
 import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.executors.YarnJobClusterExecutor;
+import org.apache.flink.yarn.executors.YarnSessionClusterExecutor;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.HelpFormatter;
@@ -346,7 +348,6 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	public Configuration applyCommandLineOptionsToConfiguration(CommandLine commandLine) throws FlinkException {
 		// we ignore the addressOption because it can only contain "yarn-cluster"
 		final Configuration effectiveConfiguration = new Configuration(configuration);
-		effectiveConfiguration.setString(DeploymentOptions.TARGET, getId());
 
 		applyDescriptorOptionToConfig(commandLine, effectiveConfiguration);
 
@@ -361,6 +362,9 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 
 			effectiveConfiguration.setString(HA_CLUSTER_ID, zooKeeperNamespace);
 			effectiveConfiguration.setString(YarnConfigOptions.APPLICATION_ID, ConverterUtils.toString(applicationId));
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnSessionClusterExecutor.NAME);
+		} else {
+			effectiveConfiguration.setString(DeploymentOptions.TARGET, YarnJobClusterExecutor.NAME);
 		}
 
 		if (commandLine.hasOption(jmMemory.getOpt())) {