You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/28 22:16:07 UTC
[flink] 01/02: [FLINK-11414] Introduce JobMasterService interface
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9fb3025e12a38136fa583110f8f267091c3db19a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:49:21 2019 +0100
[FLINK-11414] Introduce JobMasterService interface
For a better separation of concerns in the JobManagerRunner, this commit introduces
a JobMasterService which only exposes the JobMaster's lifecycle methods to the
JobManagerRunner. This allows for an easier substitution when testing the JobManagerRunner.
This closes #7563.
---
.../flink/runtime/jobmaster/JobManagerRunner.java | 15 +++--
.../apache/flink/runtime/jobmaster/JobMaster.java | 17 +++++-
.../flink/runtime/jobmaster/JobMasterService.java | 65 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 9 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index c10a535..e5e545f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -77,7 +77,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
private final JobManagerSharedServices jobManagerSharedServices;
- private final JobMaster jobMaster;
+ private final JobMasterService jobMasterService;
private final FatalErrorHandler fatalErrorHandler;
@@ -152,7 +152,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
rpcService);
// now start the JobManager
- this.jobMaster = new JobMaster(
+ this.jobMasterService = new JobMaster(
rpcService,
jobMasterConfiguration,
resourceId,
@@ -212,8 +212,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
setNewLeaderGatewayFuture();
leaderGatewayFuture.completeExceptionally(new FlinkException("JobMaster has been shut down."));
- jobMaster.shutDown();
- final CompletableFuture<Void> jobManagerTerminationFuture = jobMaster.getTerminationFuture();
+ final CompletableFuture<Void> jobManagerTerminationFuture = jobMasterService.closeAsync();
jobManagerTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
@@ -328,7 +327,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
- final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
+ final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId), rpcTimeout);
final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
startFuture.whenCompleteAsync(
@@ -345,7 +344,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
if (leaderElectionService.hasLeadership(leaderSessionId)) {
- currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
+ currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
leaderElectionService.confirmLeaderSessionID(leaderSessionId);
} else {
log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
@@ -365,7 +364,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
setNewLeaderGatewayFuture();
- CompletableFuture<Acknowledge> suspendFuture = jobMaster.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
+ CompletableFuture<Acknowledge> suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
suspendFuture.whenCompleteAsync(
(Acknowledge ack, Throwable throwable) -> {
@@ -396,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
@Override
public String getAddress() {
- return jobMaster.getAddress();
+ return jobMasterService.getAddress();
}
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c63e5f5..3679d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -145,7 +145,7 @@ import static org.apache.flink.util.Preconditions.checkState;
* given task</li>
* </ul>
*/
-public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
+public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
/** Default names for Flink's distributed components. */
public static final String JOB_MANAGER_NAME = "jobmanager";
@@ -1500,6 +1500,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
}
//----------------------------------------------------------------------------------------------
+ // Service methods
+ //----------------------------------------------------------------------------------------------
+
+ @Override
+ public JobMasterGateway getGateway() {
+ return getSelfGateway(JobMasterGateway.class);
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ shutDown();
+ return getTerminationFuture();
+ }
+
+ //----------------------------------------------------------------------------------------------
// Utility classes
//----------------------------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
new file mode 100644
index 0000000..09a5870
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AutoCloseableAsync;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface which specifies the JobMaster service.
+ */
+public interface JobMasterService extends AutoCloseableAsync {
+
+ /**
+ * Start the JobMaster service with the given {@link JobMasterId}.
+ *
+ * @param jobMasterId to start the service with
+ * @param rpcTimeout timeout of this operation
+ * @return Future which is completed once the JobMaster service has been started
+ * @throws Exception if the JobMaster service could not be started
+ */
+ CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time rpcTimeout) throws Exception;
+
+ /**
+ * Suspend the JobMaster service. This means that the service will stop to react
+ * to messages.
+ *
+ * @param cause for the suspension
+ * @param rpcTimeout timeout of this operation
+ * @return Future which is completed once the JobMaster service has been suspended
+ */
+ CompletableFuture<Acknowledge> suspend(Exception cause, Time rpcTimeout);
+
+ /**
+ * Get the {@link JobMasterGateway} belonging to this service.
+ *
+ * @return JobMasterGateway belonging to this service
+ */
+ JobMasterGateway getGateway();
+
+ /**
+ * Get the address of the JobMaster service under which it is reachable.
+ *
+ * @return Address of the JobMaster service
+ */
+ String getAddress();
+}