You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:04 UTC

[flink] 08/19: [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54a277b08dfe8ecb702c516392b01f41078df993
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 12 12:23:26 2019 +0100

    [FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors
    
    change the signature of the executors and their implementation
---
 .../java/org/apache/flink/client/ClientUtils.java  |  30 +++----
 .../client/deployment/executors/JobClientImpl.java |  82 +++++++++++++++++
 .../deployment/executors/JobClusterExecutor.java   | 100 +++++++++++++++++++++
 .../executors/SessionClusterExecutor.java          |  94 +++++++++++++++++++
 .../apache/flink/client/program/ClientTest.java    |   5 ++
 .../org/apache/flink/core/execution/Executor.java  |   4 +-
 .../execution/{Executor.java => JobClient.java}    |  23 +++--
 .../flink/api/java/ExecutionEnvironment.java       |  15 +++-
 .../flink/api/java/ExecutorDiscoveryTest.java      |  19 +++-
 .../environment/StreamExecutionEnvironment.java    |  17 +++-
 .../environment/ExecutorDiscoveryTest.java         |  18 +++-
 11 files changed, 371 insertions(+), 36 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index ac247ac..4a95a16 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,22 +19,20 @@
 package org.apache.flink.client;
 
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.client.program.ContextEnvironmentFactory;
-import org.apache.flink.client.program.DetachedJobExecutionResult;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.slf4j.Logger;
@@ -101,22 +99,18 @@ public enum ClientUtils {
 		return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
 	}
 
-	public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+	public static CompletableFuture<JobClient> submitJobAndGetJobClient(ClusterClient<?> client, JobGraph jobGraph) {
 		return checkNotNull(client)
 				.submitJob(checkNotNull(jobGraph))
-				.thenApply(JobSubmissionResult::getJobID);
-	}
-
-	public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
-		return submitJobAndGetJobID(client, jobGraph)
-				.thenCompose(client::requestJobResult);
+				.thenApply(JobSubmissionResult::getJobID)
+				.thenApply(jobID -> new JobClientImpl<>(client, jobID));
 	}
 
 	public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
 		try {
-			return submitJobAndGetJobID(client, jobGraph)
-				.thenApply(DetachedJobExecutionResult::new)
-				.get();
+			return submitJobAndGetJobClient(client, jobGraph)
+					.thenCompose(JobClient::getJobSubmissionResult)
+					.get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
@@ -129,17 +123,17 @@ public enum ClientUtils {
 			ClassLoader classLoader) throws ProgramInvocationException {
 		checkNotNull(classLoader);
 
-		JobResult jobResult;
+		JobClient jobClient;
 		try {
-			jobResult = submitJobAndGetResult(client, jobGraph).get();
+			jobClient = submitJobAndGetJobClient(client, jobGraph).get();
 		} catch (InterruptedException | ExecutionException e) {
 			ExceptionUtils.checkInterrupted(e);
 			throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
 		}
 
 		try {
-			return jobResult.toJobExecutionResult(classLoader);
-		} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+			return jobClient.getJobExecutionResult(classLoader).get();
+		} catch (Exception e) {
 			throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
 		}
 	}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
new file mode 100644
index 0000000..e042369
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Javadoc.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+	private final ClusterClient<ClusterID> clusterClient;
+
+	private final JobID jobID;
+
+	public JobClientImpl(
+			final ClusterClient<ClusterID> clusterClient,
+			final JobID jobID) {
+		this.jobID = checkNotNull(jobID);
+		this.clusterClient = checkNotNull(clusterClient);
+	}
+
+	public JobID getJobID() {
+		return jobID;
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+		return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+	}
+
+	@Override
+	public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+		final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+		final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+		jobResultFuture.whenComplete(((jobResult, throwable) -> {
+			if (throwable != null) {
+				ExceptionUtils.checkInterrupted(throwable);
+				res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+			} else {
+				try {
+					res.complete(jobResult.toJobExecutionResult(userClassloader));
+				} catch (JobExecutionException | IOException | ClassNotFoundException e) {
+					res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+				}
+			}
+		}));
+		return res;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
new file mode 100644
index 0000000..902d7fd
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+public class JobClusterExecutor<ClusterID> implements Executor {
+
+	private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+			final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+			final List<URL> dependencies = configAccessor.getJars();
+			final List<URL> classpaths = configAccessor.getClasspaths();
+
+			final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies);
+
+			final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
+
+			try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+				LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+				return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
+			}
+		}
+	}
+
+	private JobGraph getJobGraph(
+			final Pipeline pipeline,
+			final Configuration configuration,
+			final List<URL> classpaths,
+			final List<URL> libraries) {
+
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+		checkNotNull(classpaths);
+		checkNotNull(libraries);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(libraries);
+		jobGraph.setClasspaths(classpaths);
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
new file mode 100644
index 0000000..b7eeb68
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+public class SessionClusterExecutor<ClusterID> implements Executor {
+
+	private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+	public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+		this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+	}
+
+	@Override
+	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
+		final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+		final List<URL> dependencies = configAccessor.getJars();
+		final List<URL> classpaths = configAccessor.getClasspaths();
+
+		final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+
+		final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+		try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+			final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+			checkState(clusterID != null);
+
+			try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+				return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+			}
+		}
+	}
+
+	private JobGraph getJobGraph(
+			final Pipeline pipeline,
+			final Configuration configuration,
+			final List<URL> classpaths,
+			final List<URL> libraries) {
+
+		checkNotNull(pipeline);
+		checkNotNull(configuration);
+		checkNotNull(classpaths);
+		checkNotNull(libraries);
+
+		final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+		final JobGraph jobGraph = FlinkPipelineTranslationUtil
+				.getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+		jobGraph.addJars(libraries);
+		jobGraph.setClasspaths(classpaths);
+		jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+		return jobGraph;
+	}
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
 	@Test
 	public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
 		PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+		when(packagedProgramMock.getUserCodeClassLoader())
+				.thenReturn(packagedProgramMock.getClass().getClassLoader());
+
 		doAnswer(new Answer<Void>() {
 			@Override
 			public Void answer(InvocationOnMock invocation) throws Throwable {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 8515f43..5be3193 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.dag.Pipeline;
 import org.apache.flink.configuration.Configuration;
 
+import java.util.concurrent.CompletableFuture;
+
 /**
  * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
  */
@@ -34,5 +36,5 @@ public interface Executor {
 	 * @param configuration the {@link Configuration} with the required execution parameters
 	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
 	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception;
 }
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 60%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8515f43..8440dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,21 +18,20 @@
 
 package org.apache.flink.core.execution;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
  */
-public interface Executor {
+@PublicEvolving
+public interface JobClient {
+
+	CompletableFuture<JobExecutionResult> getJobSubmissionResult();
 
-	/**
-	 * Executes a {@link Pipeline} based on the provided configuration.
-	 *
-	 * @param pipeline the {@link Pipeline} to execute
-	 * @param configuration the {@link Configuration} with the required execution parameters
-	 * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
-	 */
-	JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+	CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
 }
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..69abe17 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	/**
 	 * Creates a new Execution Environment.
 	 */
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
 	protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -796,7 +804,12 @@ public class ExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		lastJobExecutionResult = executor.execute(plan, configuration);
+
+		final JobClient jobClient = executor.execute(plan, configuration).get();
+		lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+				? jobClient.getJobExecutionResult(userClassloader).get()
+				: jobClient.getJobSubmissionResult().get();
+
 		return lastJobExecutionResult;
 	}
 
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..9acbf3d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -25,13 +25,17 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -46,6 +50,7 @@ public class ExecutorDiscoveryTest {
 	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
 		final Configuration configuration = new Configuration();
 		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.ATTACHED, true);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
@@ -78,7 +83,19 @@ public class ExecutorDiscoveryTest {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+					}
+				});
 			};
 		}
 	}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..7a59d5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
 import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
 
 	private final Configuration configuration;
 
+	private ClassLoader userClassloader;
+
 	// --------------------------------------------------------------------------------------------
 	// Constructor and Properties
 	// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
 		this(new DefaultExecutorServiceLoader(), executorConfiguration);
 	}
 
-	public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+	public StreamExecutionEnvironment(
+			final ExecutorServiceLoader executorServiceLoader,
+			final Configuration executorConfiguration) {
 		this.executorServiceLoader = checkNotNull(executorServiceLoader);
 		this.configuration = checkNotNull(executorConfiguration);
+		this.userClassloader = getClass().getClassLoader();
+	}
+
+	protected void setUserClassloader(final ClassLoader userClassloader) {
+		this.userClassloader = checkNotNull(userClassloader);
 	}
 
 	protected Configuration getConfiguration() {
@@ -1552,7 +1562,10 @@ public class StreamExecutionEnvironment {
 				executorServiceLoader.getExecutorFactory(configuration);
 
 		final Executor executor = executorFactory.getExecutor(configuration);
-		return executor.execute(streamGraph, configuration);
+		final JobClient jobClient = executor.execute(streamGraph, configuration).get();
+		return configuration.getBoolean(DeploymentOptions.ATTACHED)
+				? jobClient.getJobExecutionResult(userClassloader).get()
+				: jobClient.getJobSubmissionResult().get();
 	}
 
 	private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..ce593c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -24,15 +24,19 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.Executor;
 import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.util.OptionalFailure;
 
 import org.junit.Test;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -47,6 +51,7 @@ public class ExecutorDiscoveryTest {
 	public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
 		final Configuration configuration = new Configuration();
 		configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+		configuration.set(DeploymentOptions.ATTACHED, true);
 
 		final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
 
@@ -79,7 +84,18 @@ public class ExecutorDiscoveryTest {
 			return (pipeline, executionConfig) -> {
 				final Map<String, OptionalFailure<Object>> res = new HashMap<>();
 				res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
-				return new JobExecutionResult(new JobID(), 12L, res);
+				return CompletableFuture.completedFuture(new JobClient(){
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+						throw new UnsupportedOperationException();
+					}
+
+					@Override
+					public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+						return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+					}
+				});
 			};
 		}
 	}