You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/10/21 21:22:39 UTC
[flink] branch release-1.9 updated: [FLINK-14434][coordination]
Dispatcher#createJobManagerRunner returns on creation succeed
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push:
new 43ae341 [FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed
43ae341 is described below
commit 43ae34172fd6680861f7c5c54a498e3c5da4b8a2
Author: tison <wa...@gmail.com>
AuthorDate: Sat Oct 19 02:03:02 2019 +0800
[FLINK-14434][coordination] Dispatcher#createJobManagerRunner returns on creation succeed
This closes #9940.
---
.../main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index 4909885..70f627a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -357,6 +357,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
return jobManagerRunnerFuture
+ .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
.thenApply(FunctionUtils.nullFn())
.whenCompleteAsync(
(ignored, throwable) -> {
@@ -370,7 +371,7 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
private CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph) {
final RpcService rpcService = getRpcService();
- final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = CompletableFuture.supplyAsync(
+ return CompletableFuture.supplyAsync(
CheckedSupplier.unchecked(() ->
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
@@ -382,8 +383,6 @@ public abstract class Dispatcher extends FencedRpcEndpoint<DispatcherId> impleme
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler)),
rpcService.getExecutor());
-
- return jobManagerRunnerFuture.thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner));
}
private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {