You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/10/21 21:22:39 UTC

[flink] branch release-1.9 updated: [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 43ae341  [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed
43ae341 is described below

commit 43ae34172fd6680861f7c5c54a498e3c5da4b8a2
Author: tison <wa...@gmail.com>
AuthorDate: Sat Oct 19 02:03:02 2019 +0800

    [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed
    
    This closes #9940.
---
 .../main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java    | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4909885..70f627a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -357,6 +357,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 		jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
 
 		return jobManagerRunnerFuture
+			.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
 			.thenApply(FunctionUtils.nullFn())
 			.whenCompleteAsync(
 				(ignored, throwable) -> {
@@ -370,7 +371,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 	private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
 		final RpcService rpcService = getRpcService();
 
-		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+		return CompletableFuture.supplyAsync(
 			CheckedSupplier.unchecked(() ->
 				jobManagerRunnerFactory.createJobManagerRunner(
 					jobGraph,
@@ -382,8 +383,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
 					new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
 					fatalErrorHandler)),
 			rpcService.getExecutor());
-
-		return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
 	}
 
 	private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {