You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/28 19:40:41 UTC
[flink] branch release-1.11 updated: [FLINK-17744] Make
(Stream)ContextEnvironment#execute call JobListener#onJobExecuted
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 0794089 [FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListener#onJobExecuted
0794089 is described below
commit 0794089ac3337835af64889d185020d30dde973e
Author: Echo Lee <13...@qq.com>
AuthorDate: Tue May 26 16:44:13 2020 +0800
[FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListener#onJobExecuted
This closes #12339.
---
.../flink/client/program/ContextEnvironment.java | 29 ++++++++++++++++++++--
.../client/program/StreamContextEnvironment.java | 26 ++++++++++++++++++-
.../flink/api/java/ExecutionEnvironment.java | 7 ++++++
3 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index ea62b80..f7e515b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -26,16 +26,21 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Execution Environment for remote execution with the Client.
*/
@@ -64,7 +69,26 @@ public class ContextEnvironment extends ExecutionEnvironment {
@Override
public JobExecutionResult execute(String jobName) throws Exception {
- JobClient jobClient = executeAsync(jobName);
+ final JobClient jobClient = executeAsync(jobName);
+ final List<JobListener> jobListeners = getJobListeners();
+
+ try {
+ final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(jobExecutionResult, null));
+ return jobExecutionResult;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
+ ExceptionUtils.rethrowException(t);
+
+ // never reached, only make javac happy
+ return null;
+ }
+ }
+
+ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
+ checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
@@ -81,7 +105,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
ContextEnvironment.class.getSimpleName(),
LOG);
jobExecutionResultFuture.whenComplete((ignored, throwable) ->
- ShutdownHookUtil.removeShutdownHook(shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
+ ShutdownHookUtil.removeShutdownHook(
+ shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
}
jobExecutionResult = jobExecutionResultFuture.get();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 697c9a0..ec81d83 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -24,19 +24,24 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.core.execution.DetachedJobExecutionResult;
import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
* Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
* testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -68,7 +73,26 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
@Override
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
- JobClient jobClient = executeAsync(streamGraph);
+ final JobClient jobClient = executeAsync(streamGraph);
+ final List<JobListener> jobListeners = getJobListeners();
+
+ try {
+ final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(jobExecutionResult, null));
+ return jobExecutionResult;
+ } catch (Throwable t) {
+ jobListeners.forEach(jobListener ->
+ jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
+ ExceptionUtils.rethrowException(t);
+
+ // never reached, only make javac happy
+ return null;
+ }
+ }
+
+ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
+ checkNotNull(jobClient);
JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a227f84..75c8dd1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -213,6 +213,13 @@ public class ExecutionEnvironment {
}
/**
+ * Gets the config JobListeners.
+ */
+ protected List<JobListener> getJobListeners() {
+ return jobListeners;
+ }
+
+ /**
* Gets the parallelism with which operation are executed by default. Operations can
* individually override this value to use a specific parallelism via
* {@link Operator#setParallelism(int)}. Other operations may need to run with a different