You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/03 11:20:09 UTC

[GitHub] [flink] kl0u commented on a change in pull request #10392: [FLINK-14854][client] Add executeAsync() method to execution environments

kl0u commented on a change in pull request #10392: [FLINK-14854][client] Add executeAsync() method to execution environments
URL: https://github.com/apache/flink/pull/10392#discussion_r353118883
 
 

 ##########
 File path: flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
 ##########
 @@ -804,6 +805,47 @@ public JobExecutionResult execute() throws Exception {
 	 * @throws Exception Thrown, if the program executions fails.
 	 */
 	public JobExecutionResult execute(String jobName) throws Exception {
+		try (final JobClient jobClient = executeAsync(jobName).get()) {
+
+			lastJobExecutionResult = configuration.getBoolean(DeploymentOptions.ATTACHED)
+					? jobClient.getJobExecutionResult(userClassloader).get()
+					: new DetachedJobExecutionResult(jobClient.getJobID());
+
+			return lastJobExecutionResult;
+		}
+	}
+
+	/**
+	 * Triggers the program execution asynchronously. The environment will execute all parts of the program that have
+	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
+	 * writing results (e.g. {@link DataSet#writeAsText(String)},
+	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
+	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+	 *
+	 * <p>The program execution will be logged and displayed with a generated default name.
+	 *
+	 * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
+	 * @throws Exception Thrown, if the program submission fails.
+	 */
+	@PublicEvolving
+	public final CompletableFuture<JobClient> executeAsync() throws Exception {
+		return executeAsync(getDefaultName());
+	}
+
+	/**
+	 * Triggers the program execution asynchronously. The environment will execute all parts of the program that have
+	 * resulted in a "sink" operation. Sink operations are for example printing results ({@link DataSet#print()},
+	 * writing results (e.g. {@link DataSet#writeAsText(String)},
+	 * {@link DataSet#write(org.apache.flink.api.common.io.FileOutputFormat, String)}, or other generic
+	 * data sinks created with {@link DataSet#output(org.apache.flink.api.common.io.OutputFormat)}.
+	 *
+	 * <p>The program execution will be logged and displayed with the given job name.
+	 *
+	 * @return A future of {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.
+	 * @throws Exception Thrown, if the program submission fails.
+	 */
+	@PublicEvolving
 
 Review comment:
   I would also add in the javadoc somehow prominently that the user is responsible for managing the lifecycle of the `JobClient`, e.g. call `close()`. If not, they risk having resource leaks.
   
   E.g. <b> ATTENTION:</b> The caller of this method is responsible for managing the lifecycle of the returned  `JobClient`. This means calling `close()` at the end of its usage. In other case, there may be resource leaks depending on the `JobClient` implementation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services