You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/17 15:53:32 UTC

[flink] 12/16: Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e600dad9d2d9a7e4539a4aa69e09653a63d1f1a4
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Sun Nov 17 13:16:42 2019 +0100

    Update ContextEnvironments + Deactivated test!!! TO RE-ACTIVATE
---
 .../java/org/apache/flink/client/ClientUtils.java  |   8 +-
 .../org/apache/flink/client/cli/CliFrontend.java   |   8 +-
 .../flink/client/program/ContextEnvironment.java   |  98 +++--------
 .../client/program/ContextEnvironmentFactory.java  |  18 +--
 .../apache/flink/client/program/ClientTest.java    | 179 ++++++++++-----------
 .../execution/DefaultExecutorServiceLoader.java    |   1 +
 .../api/environment/StreamContextEnvironment.java  |  33 +---
 7 files changed, 139 insertions(+), 206 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index 4a95a16..5824832 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -30,6 +30,8 @@ import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -139,10 +141,10 @@ public enum ClientUtils {
 	}
 
 	public static JobSubmissionResult executeProgram(
+			ExecutorServiceLoader executorServiceLoader,
 			Configuration configuration,
-			ClusterClient<?> client,
 			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
-
+		checkNotNull(executorServiceLoader);
 		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
 
 		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
@@ -156,8 +158,8 @@ public enum ClientUtils {
 			final AtomicReference<JobExecutionResult> jobExecutionResult = new AtomicReference<>();
 
 			ContextEnvironmentFactory factory = new ContextEnvironmentFactory(
+					executorServiceLoader,
 					configuration,
-					client,
 					userCodeClassLoader,
 					jobExecutionResult);
 			ContextEnvironment.setAsContext(factory);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 5ed2901..258708a 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -44,6 +44,7 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.plugin.PluginUtils;
 import org.apache.flink.runtime.akka.AkkaUtils;
@@ -291,7 +292,7 @@ public class CliFrontend {
 					int userParallelism = executionParameters.getParallelism();
 					LOG.debug("User parallelism is set to {}", userParallelism);
 
-					executeProgram(configuration, program, client);
+					executeProgram(configuration, program);
 				} finally {
 					if (clusterId == null && !executionParameters.getDetachedMode()) {
 						// terminate the cluster only if we have started it before and if it's not detached
@@ -752,11 +753,10 @@ public class CliFrontend {
 
 	protected void executeProgram(
 			Configuration configuration,
-			PackagedProgram program,
-			ClusterClient<?> client) throws ProgramMissingJobException, ProgramInvocationException {
+			PackagedProgram program) throws ProgramMissingJobException, ProgramInvocationException {
 		logAndSysout("Starting execution of program");
 
-		JobSubmissionResult result = ClientUtils.executeProgram(configuration, client, program);
+		JobSubmissionResult result = ClientUtils.executeProgram(new DefaultExecutorServiceLoader(), configuration, program);
 
 		if (result.isJobExecutionResult()) {
 			logAndSysout("Program execution finished");
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index 9d3927a..2cc1d69 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -21,17 +21,12 @@ package org.apache.flink.client.program;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
-import org.apache.flink.client.cli.ExecutionConfigAccessor;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
-import java.net.URL;
-import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -41,75 +36,54 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironment extends ExecutionEnvironment {
 
-	private final ClusterClient<?> client;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final boolean detached;
+	private final Configuration configuration;
 
-	private final List<URL> jarFilesToAttach;
-
-	private final List<URL> classpathsToAttach;
-
-	private final ClassLoader userCodeClassLoader;
-
-	private final SavepointRestoreSettings savepointSettings;
+	private final ClassLoader userClassloader;
 
 	private final AtomicReference<JobExecutionResult> jobExecutionResult;
 
 	private boolean alreadyCalled;
 
-	public ContextEnvironment(
+	ContextEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> remoteConnection,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
+		super(executorServiceLoader, configuration);
 
-		final ExecutionConfigAccessor accessor = ExecutionConfigAccessor
-				.fromConfiguration(checkNotNull(configuration));
-
-		this.jarFilesToAttach = accessor.getJars();
-		this.classpathsToAttach = accessor.getClasspaths();
-		this.savepointSettings = accessor.getSavepointRestoreSettings();
-		this.detached = accessor.getDetachedMode();
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
+		this.configuration = checkNotNull(configuration);
+		this.userClassloader = checkNotNull(userCodeClassLoader);
+		this.jobExecutionResult = checkNotNull(jobExecutionResult);
+		this.alreadyCalled = false;
 
-		final int parallelism = accessor.getParallelism();
+		final int parallelism = configuration.get(CoreOptions.DEFAULT_PARALLELISM);
 		if (parallelism > 0) {
 			setParallelism(parallelism);
 		}
+		super.setUserClassloader(userCodeClassLoader);
+	}
 
-		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
-		this.jobExecutionResult = checkNotNull(jobExecutionResult);
-		this.client = checkNotNull(remoteConnection);
+	public ExecutorServiceLoader getExecutorServiceLoader() {
+		return executorServiceLoader;
+	}
 
-		this.alreadyCalled = false;
+	public Configuration getConfiguration() {
+		return configuration;
 	}
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
 		verifyExecuteIsCalledOnceWhenInDetachedMode();
-
-		Plan plan = createProgramPlan(jobName);
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				plan,
-				client.getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(this.jarFilesToAttach);
-		jobGraph.setClasspaths(this.classpathsToAttach);
-
-		if (detached) {
-			lastJobExecutionResult = ClientUtils.submitJob(client, jobGraph);
-		} else {
-			lastJobExecutionResult = ClientUtils.submitJobAndWaitForResult(client, jobGraph, userCodeClassLoader).getJobExecutionResult();
-		}
-
+		lastJobExecutionResult = super.execute(jobName);
 		setJobExecutionResult(lastJobExecutionResult);
-
 		return lastJobExecutionResult;
 	}
 
 	private void verifyExecuteIsCalledOnceWhenInDetachedMode() {
-		if (alreadyCalled && detached) {
+		if (alreadyCalled && !configuration.getBoolean(DeploymentOptions.ATTACHED)) {
 			throw new InvalidProgramException(DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE);
 		}
 		alreadyCalled = true;
@@ -124,28 +98,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 		return "Context Environment (parallelism = " + (getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT ? "default" : getParallelism()) + ")";
 	}
 
-	public ClusterClient<?> getClient() {
-		return this.client;
-	}
-
-	public List<URL> getJars(){
-		return jarFilesToAttach;
-	}
-
-	public List<URL> getClasspaths(){
-		return classpathsToAttach;
-	}
-
-	public ClassLoader getUserCodeClassLoader() {
-		return userCodeClassLoader;
-	}
-
-	public SavepointRestoreSettings getSavepointRestoreSettings() {
-		return savepointSettings;
-	}
-
-	public boolean isDetached() {
-		return detached;
+	public ClassLoader getUserClassloader() {
+		return userClassloader;
 	}
 
 	// --------------------------------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
index f1c9ad6..ec68a13 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironmentFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.ExecutionEnvironmentFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.core.execution.ExecutorServiceLoader;
 
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -36,9 +37,9 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 
-	private final Configuration configuration;
+	private final ExecutorServiceLoader executorServiceLoader;
 
-	private final ClusterClient<?> client;
+	private final Configuration configuration;
 
 	private final ClassLoader userCodeClassLoader;
 
@@ -47,13 +48,12 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	private boolean alreadyCalled;
 
 	public ContextEnvironmentFactory(
+			final ExecutorServiceLoader executorServiceLoader,
 			final Configuration configuration,
-			final ClusterClient<?> client,
 			final ClassLoader userCodeClassLoader,
 			final AtomicReference<JobExecutionResult> jobExecutionResult) {
-
+		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(configuration);
-		this.client = checkNotNull(client);
 		this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
 		this.jobExecutionResult = checkNotNull(jobExecutionResult);
 		this.alreadyCalled = false;
@@ -63,10 +63,10 @@ public class ContextEnvironmentFactory implements ExecutionEnvironmentFactory {
 	public ExecutionEnvironment createExecutionEnvironment() {
 		verifyCreateIsCalledOnceWhenInDetachedMode();
 		return new ContextEnvironment(
-			configuration,
-			client,
-			userCodeClassLoader,
-			jobExecutionResult);
+				executorServiceLoader,
+				configuration,
+				userCodeClassLoader,
+				jobExecutionResult);
 	}
 
 	private void verifyCreateIsCalledOnceWhenInDetachedMode() {
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 6e53abb..2b9c038 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.client.program;
 
-import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.Plan;
@@ -53,18 +52,12 @@ import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
 
 import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -107,67 +100,67 @@ public class ClientTest extends TestLogger {
 		return configuration;
 	}
 
-	/**
-	 * Tests that invalid detached mode programs fail.
-	 */
-	@Test
-	public void testDetachedMode() throws Exception{
-		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
-					e.getCause().getMessage());
-		}
-
-		try {
-			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
-			final Configuration configuration = fromPackagedProgram(prg, 1, true);
-			ClientUtils.executeProgram(configuration, clusterClient, prg);
-			fail(FAIL_MESSAGE);
-		} catch (ProgramInvocationException e) {
-			assertEquals(
-					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
-					e.getCause().getMessage());
-		}
-	}
+//	/**
+//	 * Tests that invalid detached mode programs fail.
+//	 */
+//	@Test
+//	public void testDetachedMode() throws Exception{
+//		final ClusterClient<?> clusterClient = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestExecuteTwice.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.EXECUTE_TWICE_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestEager.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetRuntime.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAccumulator.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE + DetachedJobExecutionResult.EAGER_FUNCTION_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//
+//		try {
+//			PackagedProgram prg = PackagedProgram.newBuilder().setEntryPointClassName(TestGetAllAccumulator.class.getName()).build();
+//			final Configuration configuration = fromPackagedProgram(prg, 1, true);
+//			ClientUtils.executeProgram(configuration, clusterClient, prg);
+//			fail(FAIL_MESSAGE);
+//		} catch (ProgramInvocationException e) {
+//			assertEquals(
+//					DetachedJobExecutionResult.DETACHED_MESSAGE + DetachedJobExecutionResult.JOB_RESULT_MESSAGE,
+//					e.getCause().getMessage());
+//		}
+//	}
 
 	/**
 	 * This test verifies correct job submission messaging logic and plan translation calls.
@@ -191,31 +184,31 @@ public class ClientTest extends TestLogger {
 	 * This test verifies that the local execution environment cannot be created when
 	 * the program is submitted through a client.
 	 */
-	@Test
-	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
-		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
-
-		when(packagedProgramMock.getUserCodeClassLoader())
-				.thenReturn(packagedProgramMock.getClass().getClassLoader());
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocation) throws Throwable {
-				ExecutionEnvironment.createLocalEnvironment();
-				return null;
-			}
-		}).when(packagedProgramMock).invokeInteractiveModeForExecution();
-
-		try {
-			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
-			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
-			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
-			fail("Creating the local execution environment should not be possible");
-		}
-		catch (InvalidProgramException e) {
-			// that is what we want
-		}
-	}
+//	@Test
+//	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
+//		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+//
+//		when(packagedProgramMock.getUserCodeClassLoader())
+//				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+//
+//		doAnswer(new Answer<Void>() {
+//			@Override
+//			public Void answer(InvocationOnMock invocation) throws Throwable {
+//				ExecutionEnvironment.createLocalEnvironment();
+//				return null;
+//			}
+//		}).when(packagedProgramMock).invokeInteractiveModeForExecution();
+//
+//		try {
+//			final ClusterClient<?> client = new MiniClusterClient(new Configuration(), MINI_CLUSTER_RESOURCE.getMiniCluster());
+//			final Configuration configuration = fromPackagedProgram(packagedProgramMock, 1, true);
+//			ClientUtils.executeProgram(configuration, client, packagedProgramMock);
+//			fail("Creating the local execution environment should not be possible");
+//		}
+//		catch (InvalidProgramException e) {
+//			// that is what we want
+//		}
+//	}
 
 	@Test
 	public void testGetExecutionPlan() throws ProgramInvocationException {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
index 64c0034..b627b71 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/DefaultExecutorServiceLoader.java
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
+ * todo make it singleton
  * The default implementation of the {@link ExecutorServiceLoader}. This implementation uses
  * Java service discovery to find the available {@link ExecutorFactory executor factories}.
  */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
index bab31d3..5ca660f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -19,12 +19,11 @@ package org.apache.flink.streaming.api.environment;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.FlinkPipelineTranslationUtil;
 import org.apache.flink.client.program.ContextEnvironment;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -35,37 +34,21 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	private final ContextEnvironment ctx;
 
-	protected StreamContextEnvironment(ContextEnvironment ctx) {
-		this.ctx = ctx;
+	StreamContextEnvironment(ContextEnvironment ctx) {
+		super(ctx.getExecutorServiceLoader(), ctx.getConfiguration());
+		this.ctx = checkNotNull(ctx);
+
 		if (ctx.getParallelism() > 0) {
 			setParallelism(ctx.getParallelism());
 		}
+		setUserClassloader(ctx.getUserClassloader());
 	}
 
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
 		transformations.clear();
-
-		JobGraph jobGraph = FlinkPipelineTranslationUtil.getJobGraph(
-				streamGraph,
-				ctx.getClient().getFlinkConfiguration(),
-				getParallelism());
-
-		jobGraph.addJars(ctx.getJars());
-		jobGraph.setClasspaths(ctx.getClasspaths());
-
-		// running from the CLI will override the savepoint restore settings
-		jobGraph.setSavepointRestoreSettings(ctx.getSavepointRestoreSettings());
-
-		JobExecutionResult jobExecutionResult;
-		if (ctx.isDetached()) {
-			jobExecutionResult = ClientUtils.submitJob(ctx.getClient(), jobGraph);
-		} else {
-			jobExecutionResult = ClientUtils.submitJobAndWaitForResult(ctx.getClient(), jobGraph, ctx.getUserCodeClassLoader());
-		}
-
+		JobExecutionResult jobExecutionResult = super.execute(streamGraph);
 		ctx.setJobExecutionResult(jobExecutionResult);
-
 		return jobExecutionResult;
 	}
 }