You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2019/11/18 13:11:04 UTC
[flink] 08/19: [FLINK-XXXXX] Change Executor.execute() signature +
add Session and Job Cluster Executors
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch executor-impl
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 54a277b08dfe8ecb702c516392b01f41078df993
Author: Kostas Kloudas <kk...@gmail.com>
AuthorDate: Tue Nov 12 12:23:26 2019 +0100
[FLINK-XXXXX] Change Executor.execute() signature + add Session and Job Cluster Executors
change the signature of the executors and their implementation
---
.../java/org/apache/flink/client/ClientUtils.java | 30 +++----
.../client/deployment/executors/JobClientImpl.java | 82 +++++++++++++++++
.../deployment/executors/JobClusterExecutor.java | 100 +++++++++++++++++++++
.../executors/SessionClusterExecutor.java | 94 +++++++++++++++++++
.../apache/flink/client/program/ClientTest.java | 5 ++
.../org/apache/flink/core/execution/Executor.java | 4 +-
.../execution/{Executor.java => JobClient.java} | 23 +++--
.../flink/api/java/ExecutionEnvironment.java | 15 +++-
.../flink/api/java/ExecutorDiscoveryTest.java | 19 +++-
.../environment/StreamExecutionEnvironment.java | 17 +++-
.../environment/ExecutorDiscoveryTest.java | 18 +++-
11 files changed, 371 insertions(+), 36 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
index ac247ac..4a95a16 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/ClientUtils.java
@@ -19,22 +19,20 @@
package org.apache.flink.client;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.executors.JobClientImpl;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.client.program.ContextEnvironmentFactory;
-import org.apache.flink.client.program.DetachedJobExecutionResult;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.client.program.ProgramMissingJobException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
@@ -101,22 +99,18 @@ public enum ClientUtils {
return FlinkUserCodeClassLoaders.create(resolveOrder, urls, parent, alwaysParentFirstLoaderPatterns);
}
- public static CompletableFuture<JobID> submitJobAndGetJobID(ClusterClient<?> client, JobGraph jobGraph) {
+ public static CompletableFuture<JobClient> submitJobAndGetJobClient(ClusterClient<?> client, JobGraph jobGraph) {
return checkNotNull(client)
.submitJob(checkNotNull(jobGraph))
- .thenApply(JobSubmissionResult::getJobID);
- }
-
- public static CompletableFuture<JobResult> submitJobAndGetResult(ClusterClient<?> client, JobGraph jobGraph) {
- return submitJobAndGetJobID(client, jobGraph)
- .thenCompose(client::requestJobResult);
+ .thenApply(JobSubmissionResult::getJobID)
+ .thenApply(jobID -> new JobClientImpl<>(client, jobID));
}
public static JobExecutionResult submitJob(ClusterClient<?> client, JobGraph jobGraph) throws ProgramInvocationException {
try {
- return submitJobAndGetJobID(client, jobGraph)
- .thenApply(DetachedJobExecutionResult::new)
- .get();
+ return submitJobAndGetJobClient(client, jobGraph)
+ .thenCompose(JobClient::getJobSubmissionResult)
+ .get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job in detached mode.", jobGraph.getJobID(), e);
@@ -129,17 +123,17 @@ public enum ClientUtils {
ClassLoader classLoader) throws ProgramInvocationException {
checkNotNull(classLoader);
- JobResult jobResult;
+ JobClient jobClient;
try {
- jobResult = submitJobAndGetResult(client, jobGraph).get();
+ jobClient = submitJobAndGetJobClient(client, jobGraph).get();
} catch (InterruptedException | ExecutionException e) {
ExceptionUtils.checkInterrupted(e);
throw new ProgramInvocationException("Could not run job", jobGraph.getJobID(), e);
}
try {
- return jobResult.toJobExecutionResult(classLoader);
- } catch (JobExecutionException | IOException | ClassNotFoundException e) {
+ return jobClient.getJobExecutionResult(classLoader).get();
+ } catch (Exception e) {
throw new ProgramInvocationException("Job failed", jobGraph.getJobID(), e);
}
}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
new file mode 100644
index 0000000..e042369
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClientImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.client.program.DetachedJobExecutionResult;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.util.ExceptionUtils;
+
+import javax.annotation.Nonnull;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Javadoc.
+ */
+public class JobClientImpl<ClusterID> implements JobClient {
+
+ private final ClusterClient<ClusterID> clusterClient;
+
+ private final JobID jobID;
+
+ public JobClientImpl(
+ final ClusterClient<ClusterID> clusterClient,
+ final JobID jobID) {
+ this.jobID = checkNotNull(jobID);
+ this.clusterClient = checkNotNull(clusterClient);
+ }
+
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ return CompletableFuture.completedFuture(new DetachedJobExecutionResult(jobID));
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader) {
+ final CompletableFuture<JobExecutionResult> res = new CompletableFuture<>();
+
+ final CompletableFuture<JobResult> jobResultFuture = clusterClient.requestJobResult(jobID);
+ jobResultFuture.whenComplete(((jobResult, throwable) -> {
+ if (throwable != null) {
+ ExceptionUtils.checkInterrupted(throwable);
+ res.completeExceptionally(new ProgramInvocationException("Could not run job", jobID, throwable));
+ } else {
+ try {
+ res.complete(jobResult.toJobExecutionResult(userClassloader));
+ } catch (JobExecutionException | IOException | ClassNotFoundException e) {
+ res.completeExceptionally(new ProgramInvocationException("Job failed", jobID, e));
+ }
+ }
+ }));
+ return res;
+ }
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
new file mode 100644
index 0000000..902d7fd
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/JobClusterExecutor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.ClusterSpecification;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The {@link Executor} to be used when executing a job in isolation.
+ * This executor will start a cluster specifically for the job at hand and
+ * tear it down when the job is finished either successfully or due to an error.
+ */
+public class JobClusterExecutor<ClusterID> implements Executor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobClusterExecutor.class);
+
+ private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+ public JobClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+ this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+ }
+
+ @Override
+ public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration executionConfig) throws Exception {
+ final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(executionConfig);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(executionConfig)) {
+ final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(executionConfig);
+
+ final List<URL> dependencies = configAccessor.getJars();
+ final List<URL> classpaths = configAccessor.getClasspaths();
+
+ final JobGraph jobGraph = getJobGraph(pipeline, executionConfig, classpaths, dependencies);
+
+ final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(executionConfig);
+
+ try (final ClusterClient<ClusterID> client = clusterDescriptor.deployJobCluster(clusterSpecification, jobGraph, configAccessor.getDetachedMode())) {
+ LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
+ return CompletableFuture.completedFuture(new JobClientImpl<>(client, jobGraph.getJobID()));
+ }
+ }
+ }
+
+ private JobGraph getJobGraph(
+ final Pipeline pipeline,
+ final Configuration configuration,
+ final List<URL> classpaths,
+ final List<URL> libraries) {
+
+ checkNotNull(pipeline);
+ checkNotNull(configuration);
+ checkNotNull(classpaths);
+ checkNotNull(libraries);
+
+ final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+ final JobGraph jobGraph = FlinkPipelineTranslationUtil
+ .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+ jobGraph.addJars(libraries);
+ jobGraph.setClasspaths(classpaths);
+ jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+ return jobGraph;
+ }
+}
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
new file mode 100644
index 0000000..b7eeb68
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/SessionClusterExecutor.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.deployment.executors;
+
+import org.apache.flink.api.dag.Pipeline;
+import org.apache.flink.client.ClientUtils;
+import org.apache.flink.client.FlinkPipelineTranslationUtil;
+import org.apache.flink.client.cli.ExecutionConfigAccessor;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.Executor;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.net.URL;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * The {@link Executor} to be used when executing a job on an already running cluster.
+ */
+public class SessionClusterExecutor<ClusterID> implements Executor {
+
+ private final ClusterClientServiceLoader clusterClientServiceLoader;
+
+ public SessionClusterExecutor(final ClusterClientServiceLoader clusterClientServiceLoader) {
+ this.clusterClientServiceLoader = checkNotNull(clusterClientServiceLoader);
+ }
+
+ @Override
+ public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception {
+ final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+
+ final List<URL> dependencies = configAccessor.getJars();
+ final List<URL> classpaths = configAccessor.getClasspaths();
+
+ final JobGraph jobGraph = getJobGraph(pipeline, configuration, classpaths, dependencies);
+
+ final ClusterClientFactory<ClusterID> clusterClientFactory = clusterClientServiceLoader.getClusterClientFactory(configuration);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
+ final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
+ checkState(clusterID != null);
+
+ try (final ClusterClient<ClusterID> clusterClient = clusterDescriptor.retrieve(clusterID)) {
+ return ClientUtils.submitJobAndGetJobClient(clusterClient, jobGraph);
+ }
+ }
+ }
+
+ private JobGraph getJobGraph(
+ final Pipeline pipeline,
+ final Configuration configuration,
+ final List<URL> classpaths,
+ final List<URL> libraries) {
+
+ checkNotNull(pipeline);
+ checkNotNull(configuration);
+ checkNotNull(classpaths);
+ checkNotNull(libraries);
+
+ final ExecutionConfigAccessor executionConfigAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
+ final JobGraph jobGraph = FlinkPipelineTranslationUtil
+ .getJobGraph(pipeline, configuration, executionConfigAccessor.getParallelism());
+
+ jobGraph.addJars(libraries);
+ jobGraph.setClasspaths(classpaths);
+ jobGraph.setSavepointRestoreSettings(executionConfigAccessor.getSavepointRestoreSettings());
+
+ return jobGraph;
+ }
+}
diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
index 5845080..6e53abb 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/program/ClientTest.java
@@ -64,6 +64,7 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Simple and maybe stupid test to check the {@link ClusterClient} class.
@@ -193,6 +194,10 @@ public class ClientTest extends TestLogger {
@Test
public void tryLocalExecution() throws ProgramInvocationException, ProgramMissingJobException {
PackagedProgram packagedProgramMock = mock(PackagedProgram.class);
+
+ when(packagedProgramMock.getUserCodeClassLoader())
+ .thenReturn(packagedProgramMock.getClass().getClassLoader());
+
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
index 8515f43..5be3193 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.configuration.Configuration;
+import java.util.concurrent.CompletableFuture;
+
/**
* The entity responsible for executing a {@link Pipeline}, i.e. a user job.
*/
@@ -34,5 +36,5 @@ public interface Executor {
* @param configuration the {@link Configuration} with the required execution parameters
* @return the {@link JobExecutionResult} corresponding to the pipeline execution.
*/
- JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+ CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration) throws Exception;
}
diff --git a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
similarity index 60%
copy from flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
copy to flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
index 8515f43..8440dd1 100644
--- a/flink-core/src/main/java/org/apache/flink/core/execution/Executor.java
+++ b/flink-core/src/main/java/org/apache/flink/core/execution/JobClient.java
@@ -18,21 +18,20 @@
package org.apache.flink.core.execution;
+import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.dag.Pipeline;
-import org.apache.flink.configuration.Configuration;
+
+import javax.annotation.Nonnull;
+
+import java.util.concurrent.CompletableFuture;
/**
- * The entity responsible for executing a {@link Pipeline}, i.e. a user job.
+ * A client that is scoped to a specific job.
*/
-public interface Executor {
+@PublicEvolving
+public interface JobClient {
+
+ CompletableFuture<JobExecutionResult> getJobSubmissionResult();
- /**
- * Executes a {@link Pipeline} based on the provided configuration.
- *
- * @param pipeline the {@link Pipeline} to execute
- * @param configuration the {@link Configuration} with the required execution parameters
- * @return the {@link JobExecutionResult} corresponding to the pipeline execution.
- */
- JobExecutionResult execute(Pipeline pipeline, Configuration configuration) throws Exception;
+ CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull final ClassLoader userClassloader);
}
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 5b07843..69abe17 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -57,6 +57,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.types.StringValue;
import org.apache.flink.util.NumberSequenceIterator;
@@ -132,6 +133,8 @@ public class ExecutionEnvironment {
private final Configuration configuration;
+ private ClassLoader userClassloader;
+
/**
* Creates a new Execution Environment.
*/
@@ -146,6 +149,11 @@ public class ExecutionEnvironment {
protected ExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(executorConfiguration);
+ this.userClassloader = getClass().getClassLoader();
+ }
+
+ protected void setUserClassloader(final ClassLoader userClassloader) {
+ this.userClassloader = checkNotNull(userClassloader);
}
protected Configuration getConfiguration() {
@@ -796,7 +804,12 @@ public class ExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- lastJobExecutionResult = executor.execute(plan, configuration);
+
+ final JobClient jobClient = executor.execute(plan, configuration).get();
+ lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
+
return lastJobExecutionResult;
}
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
index 49013b8..9acbf3d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/ExecutorDiscoveryTest.java
@@ -25,13 +25,17 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.util.OptionalFailure;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -46,6 +50,7 @@ public class ExecutorDiscoveryTest {
public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
final Configuration configuration = new Configuration();
configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+ configuration.set(DeploymentOptions.ATTACHED, true);
final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
@@ -78,7 +83,19 @@ public class ExecutorDiscoveryTest {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
- return new JobExecutionResult(new JobID(), 12L, res);
+
+ return CompletableFuture.completedFuture(new JobClient(){
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+ }
+ });
};
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3870b52..7a59d5a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -53,6 +53,7 @@ import org.apache.flink.core.execution.DefaultExecutorServiceLoader;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
import org.apache.flink.core.execution.ExecutorServiceLoader;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -154,6 +155,8 @@ public class StreamExecutionEnvironment {
private final Configuration configuration;
+ private ClassLoader userClassloader;
+
// --------------------------------------------------------------------------------------------
// Constructor and Properties
// --------------------------------------------------------------------------------------------
@@ -166,9 +169,16 @@ public class StreamExecutionEnvironment {
this(new DefaultExecutorServiceLoader(), executorConfiguration);
}
- public StreamExecutionEnvironment(final ExecutorServiceLoader executorServiceLoader, final Configuration executorConfiguration) {
+ public StreamExecutionEnvironment(
+ final ExecutorServiceLoader executorServiceLoader,
+ final Configuration executorConfiguration) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = checkNotNull(executorConfiguration);
+ this.userClassloader = getClass().getClassLoader();
+ }
+
+ protected void setUserClassloader(final ClassLoader userClassloader) {
+ this.userClassloader = checkNotNull(userClassloader);
}
protected Configuration getConfiguration() {
@@ -1552,7 +1562,10 @@ public class StreamExecutionEnvironment {
executorServiceLoader.getExecutorFactory(configuration);
final Executor executor = executorFactory.getExecutor(configuration);
- return executor.execute(streamGraph, configuration);
+ final JobClient jobClient = executor.execute(streamGraph, configuration).get();
+ return configuration.getBoolean(DeploymentOptions.ATTACHED)
+ ? jobClient.getJobExecutionResult(userClassloader).get()
+ : jobClient.getJobSubmissionResult().get();
}
private void consolidateParallelismDefinitionsInConfiguration() {
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
index 9c11fdf..ce593c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/environment/ExecutorDiscoveryTest.java
@@ -24,15 +24,19 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.Executor;
import org.apache.flink.core.execution.ExecutorFactory;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.util.OptionalFailure;
import org.junit.Test;
+import javax.annotation.Nonnull;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
@@ -47,6 +51,7 @@ public class ExecutorDiscoveryTest {
public void correctExecutorShouldBeInstantiatedBasedOnConfigurationOption() throws Exception {
final Configuration configuration = new Configuration();
configuration.set(DeploymentOptions.TARGET, IDReportingExecutorFactory.ID);
+ configuration.set(DeploymentOptions.ATTACHED, true);
final JobExecutionResult result = executeTestJobBasedOnConfig(configuration);
@@ -79,7 +84,18 @@ public class ExecutorDiscoveryTest {
return (pipeline, executionConfig) -> {
final Map<String, OptionalFailure<Object>> res = new HashMap<>();
res.put(DeploymentOptions.TARGET.key(), OptionalFailure.of(ID));
- return new JobExecutionResult(new JobID(), 12L, res);
+ return CompletableFuture.completedFuture(new JobClient(){
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobSubmissionResult() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<JobExecutionResult> getJobExecutionResult(@Nonnull ClassLoader userClassloader) {
+ return CompletableFuture.completedFuture(new JobExecutionResult(new JobID(), 12L, res));
+ }
+ });
};
}
}