You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/28 22:16:07 UTC

[flink] 01/02: [FLINK-11414] Introduce JobMasterService interface

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9fb3025e12a38136fa583110f8f267091c3db19a
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 22 16:49:21 2019 +0100

    [FLINK-11414] Introduce JobMasterService interface
    
    For a better separation of concerns in the JobManagerRunner, this commit introduces
    a JobMasterService which only exposes the JobMaster's lifecycle methods to the
    JobManagerRunner. This allows for an easier substitution when testing the JobManagerRunner.
    
    This closes #7563.
---
 .../flink/runtime/jobmaster/JobManagerRunner.java  | 15 +++--
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 17 +++++-
 .../flink/runtime/jobmaster/JobMasterService.java  | 65 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 9 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index c10a535..e5e545f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -77,7 +77,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private final JobManagerSharedServices jobManagerSharedServices;
 
-	private final JobMaster jobMaster;
+	private final JobMasterService jobMasterService;
 
 	private final FatalErrorHandler fatalErrorHandler;
 
@@ -152,7 +152,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				rpcService);
 
 			// now start the JobManager
-			this.jobMaster = new JobMaster(
+			this.jobMasterService = new JobMaster(
 				rpcService,
 				jobMasterConfiguration,
 				resourceId,
@@ -212,8 +212,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 				setNewLeaderGatewayFuture();
 				leaderGatewayFuture.completeExceptionally(new FlinkException("JobMaster has been shut down."));
 
-				jobMaster.shutDown();
-				final CompletableFuture<Void> jobManagerTerminationFuture = jobMaster.getTerminationFuture();
+				final CompletableFuture<Void> jobManagerTerminationFuture = jobMasterService.closeAsync();
 
 				jobManagerTerminationFuture.whenComplete(
 					(Void ignored, Throwable throwable) -> {
@@ -328,7 +327,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			runningJobsRegistry.setJobRunning(jobGraph.getJobID());
 
-			final CompletableFuture<Acknowledge> startFuture = jobMaster.start(new JobMasterId(leaderSessionId), rpcTimeout);
+			final CompletableFuture<Acknowledge> startFuture = jobMasterService.start(new JobMasterId(leaderSessionId), rpcTimeout);
 			final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
 
 			startFuture.whenCompleteAsync(
@@ -345,7 +344,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
 		if (leaderElectionService.hasLeadership(leaderSessionId)) {
-			currentLeaderGatewayFuture.complete(jobMaster.getSelfGateway(JobMasterGateway.class));
+			currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
 			leaderElectionService.confirmLeaderSessionID(leaderSessionId);
 		} else {
 			log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
@@ -365,7 +364,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 			setNewLeaderGatewayFuture();
 
-			CompletableFuture<Acknowledge>  suspendFuture = jobMaster.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
+			CompletableFuture<Acknowledge>  suspendFuture = jobMasterService.suspend(new FlinkException("JobManager is no longer the leader."), rpcTimeout);
 
 			suspendFuture.whenCompleteAsync(
 				(Acknowledge ack, Throwable throwable) -> {
@@ -396,7 +395,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, A
 
 	@Override
 	public String getAddress() {
-		return jobMaster.getAddress();
+		return jobMasterService.getAddress();
 	}
 
 	@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c63e5f5..3679d86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -145,7 +145,7 @@ import static org.apache.flink.util.Preconditions.checkState;
  * given task</li>
  * </ul>
  */
-public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
+public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway, JobMasterService {
 
 	/** Default names for Flink's distributed components. */
 	public static final String JOB_MANAGER_NAME = "jobmanager";
@@ -1500,6 +1500,21 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 	}
 
 	//----------------------------------------------------------------------------------------------
+	// Service methods
+	//----------------------------------------------------------------------------------------------
+
+	@Override
+	public JobMasterGateway getGateway() {
+		return getSelfGateway(JobMasterGateway.class);
+	}
+
+	@Override
+	public CompletableFuture<Void> closeAsync() {
+		shutDown();
+		return getTerminationFuture();
+	}
+
+	//----------------------------------------------------------------------------------------------
 	// Utility classes
 	//----------------------------------------------------------------------------------------------
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
new file mode 100644
index 0000000..09a5870
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterService.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.util.AutoCloseableAsync;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface which specifies the JobMaster service.
+ */
+public interface JobMasterService extends AutoCloseableAsync {
+
+	/**
+	 * Start the JobMaster service with the given {@link JobMasterId}.
+	 *
+	 * @param jobMasterId to start the service with
+	 * @param rpcTimeout timeout of this operation
+	 * @return Future which is completed once the JobMaster service has been started
+	 * @throws Exception if the JobMaster service could not be started
+	 */
+	CompletableFuture<Acknowledge> start(JobMasterId jobMasterId, Time rpcTimeout) throws Exception;
+
+	/**
+	 * Suspend the JobMaster service. This means that the service will stop to react
+	 * to messages.
+	 *
+	 * @param cause for the suspension
+	 * @param rpcTimeout timeout of this operation
+	 * @return Future which is completed once the JobMaster service has been suspended
+	 */
+	CompletableFuture<Acknowledge> suspend(Exception cause, Time rpcTimeout);
+
+	/**
+	 * Get the {@link JobMasterGateway} belonging to this service.
+	 *
+	 * @return JobMasterGateway belonging to this service
+	 */
+	JobMasterGateway getGateway();
+
+	/**
+	 * Get the address of the JobMaster service under which it is reachable.
+	 *
+	 * @return Address of the JobMaster service
+	 */
+	String getAddress();
+}