You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/21 09:59:35 UTC

[GitHub] [flink] zentol commented on a change in pull request #14431: [FLINK-11719] Do not reuse JobMaster instances across leader sessions

zentol commented on a change in pull request #14431:
URL: https://github.com/apache/flink/pull/14431#discussion_r546605468



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -797,52 +796,75 @@ private void startJobExecution() throws Exception {
 	}
 
 	private void startJobMasterServices() throws Exception {
-		// start the slot pool make sure the slot pool now accepts messages for this leader
-		slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
-
-		//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start
-		// try to reconnect to previously known leader
-		reconnectToResourceManager(new FlinkException("Starting JobMaster component."));
-
-		// job is ready to go, try to establish connection with resource manager
-		//   - activate leader retrieval for the resource manager
-		//   - on notification of the leader, the connection will be established and
-		//     the slot pool will start requesting slots
-		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
+		try {
+			// start the slot pool make sure the slot pool now accepts messages for this leader
+			slotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());
+
+			// job is ready to go, try to establish connection with resource manager
+			//   - activate leader retrieval for the resource manager
+			//   - on notification of the leader, the connection will be established and
+			//     the slot pool will start requesting slots
+			resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
+		} catch (Exception e) {
+			handleStartJobMasterServicesError(e);
+		}
 	}
 
-	/**
-	 * Suspending job, all the running tasks will be cancelled, and communication with other components
-	 * will be disposed.
-	 *
-	 * @param cause The reason of why this job been suspended.
-	 */
-	private void suspendExecution(final Exception cause) {
-		validateRunsInMainThread();
+	private void handleStartJobMasterServicesError(Exception e) throws Exception {
+		try {
+			stopJobMasterServices();
+		} catch (Exception inner) {
+			e.addSuppressed(inner);
+		}
+
+		throw e;
+	}
+
+	private void stopJobMasterServices() throws Exception {
+		Exception resultingException = null;
 
 		try {
 			resourceManagerLeaderRetriever.stop();
-			resourceManagerAddress = null;
-		} catch (Throwable t) {
-			log.warn("Failed to stop resource manager leader retriever when suspending.", t);
+		} catch (Exception e) {
+			resultingException = e;
 		}
 
-		suspendScheduler(cause);
+		// TODO: Distinguish between job termination which should free all slots and a loss of leadership which should keep the slots
+		slotPool.close();
+
+		stopHeartbeatServices();
+
+		ExceptionUtils.tryRethrowException(resultingException);
+	}
+
+	private void stopJobExecution(final Exception cause) throws Exception {
+		validateRunsInMainThread();
+
+		Exception resultingException = null;

Review comment:
       Move this closer to the try block?

##########
File path: flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterSchedulerTest.java
##########
@@ -72,16 +70,13 @@ public void testIfStartSchedulingFailsJobMasterFailsFatally() throws Exception {
 		System.setSecurityManager(trackingSecurityManager);
 
 		final SchedulerNGFactory schedulerFactory = new FailingSchedulerFactory();
-		final JobMaster jobMaster = new JobMasterBuilder(new JobGraph(), TESTING_RPC_SERVICE_RESOURCE
-			.getTestingRpcService())
+		final JobMaster jobMaster = new JobMasterBuilder(new JobGraph(), TESTING_RPC_SERVICE_RESOURCE.getTestingRpcService())
 			.withSchedulerFactory(schedulerFactory)
 			.createJobMaster();
 
-		final CompletableFuture<Acknowledge> startFuture = jobMaster.start(JobMasterId.generate());
+		jobMaster.start();
 
 		try {
-			startFuture.join();
-
 			assertThat(trackingSecurityManager.getSystemExitFuture().join(), is(FatalExitExceptionHandler.EXIT_CODE));

Review comment:
       This test got stuck on CI; I suppose we never call the fatal error handler since an error during the start of the scheduling now prevents the RpcEndpoint from starting altogether.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org