You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/28 19:40:41 UTC

[flink] branch release-1.11 updated: [FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListener#onJobExecuted

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new 0794089  [FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListener#onJobExecuted
0794089 is described below

commit 0794089ac3337835af64889d185020d30dde973e
Author: Echo Lee <13...@qq.com>
AuthorDate: Tue May 26 16:44:13 2020 +0800

    [FLINK-17744] Make (Stream)ContextEnvironment#execute call JobListener#onJobExecuted
    
    This closes #12339.
---
 .../flink/client/program/ContextEnvironment.java   | 29 ++++++++++++++++++++--
 .../client/program/StreamContextEnvironment.java   | 26 ++++++++++++++++++-
 .../flink/api/java/ExecutionEnvironment.java       |  7 ++++++
 3 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
index ea62b80..f7e515b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java
@@ -26,16 +26,21 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Execution Environment for remote execution with the Client.
  */
@@ -64,7 +69,26 @@ public class ContextEnvironment extends ExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(String jobName) throws Exception {
-		JobClient jobClient = executeAsync(jobName);
+		final JobClient jobClient = executeAsync(jobName);
+		final List<JobListener> jobListeners = getJobListeners();
+
+		try {
+			final JobExecutionResult  jobExecutionResult = getJobExecutionResult(jobClient);
+			jobListeners.forEach(jobListener ->
+					jobListener.onJobExecuted(jobExecutionResult, null));
+			return jobExecutionResult;
+		} catch (Throwable t) {
+			jobListeners.forEach(jobListener ->
+					jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
+			ExceptionUtils.rethrowException(t);
+
+			// never reached, only make javac happy
+			return null;
+		}
+	}
+
+	private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
+		checkNotNull(jobClient);
 
 		JobExecutionResult jobExecutionResult;
 		if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
@@ -81,7 +105,8 @@ public class ContextEnvironment extends ExecutionEnvironment {
 						ContextEnvironment.class.getSimpleName(),
 						LOG);
 				jobExecutionResultFuture.whenComplete((ignored, throwable) ->
-						ShutdownHookUtil.removeShutdownHook(shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
+						ShutdownHookUtil.removeShutdownHook(
+							shutdownHook, ContextEnvironment.class.getSimpleName(), LOG));
 			}
 
 			jobExecutionResult = jobExecutionResultFuture.get();
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
index 697c9a0..ec81d83 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/StreamContextEnvironment.java
@@ -24,19 +24,24 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.DeploymentOptions;
 import org.apache.flink.core.execution.DetachedJobExecutionResult;
 import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.JobListener;
 import org.apache.flink.core.execution.PipelineExecutorServiceLoader;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
 import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.ShutdownHookUtil;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Special {@link StreamExecutionEnvironment} that will be used in cases where the CLI client or
  * testing utilities create a {@link StreamExecutionEnvironment} that should be used when
@@ -68,7 +73,26 @@ public class StreamContextEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
-		JobClient jobClient = executeAsync(streamGraph);
+		final JobClient jobClient = executeAsync(streamGraph);
+		final List<JobListener> jobListeners = getJobListeners();
+
+		try {
+			final JobExecutionResult  jobExecutionResult = getJobExecutionResult(jobClient);
+			jobListeners.forEach(jobListener ->
+					jobListener.onJobExecuted(jobExecutionResult, null));
+			return jobExecutionResult;
+		} catch (Throwable t) {
+			jobListeners.forEach(jobListener ->
+					jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));
+			ExceptionUtils.rethrowException(t);
+
+			// never reached, only make javac happy
+			return null;
+		}
+	}
+
+	private JobExecutionResult getJobExecutionResult(final JobClient jobClient) throws Exception {
+		checkNotNull(jobClient);
 
 		JobExecutionResult jobExecutionResult;
 		if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index a227f84..75c8dd1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -213,6 +213,13 @@ public class ExecutionEnvironment {
 	}
 
 	/**
+	 * Gets the config JobListeners.
+	 */
+	protected List<JobListener> getJobListeners() {
+		return jobListeners;
+	}
+
+	/**
 	 * Gets the parallelism with which operation are executed by default. Operations can
 	 * individually override this value to use a specific parallelism via
 	 * {@link Operator#setParallelism(int)}. Other operations may need to run with a different