You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/06/03 04:23:44 UTC

[GitHub] [flink] shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

shuai-xu commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r289684168
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##########
 @@ -759,37 +759,43 @@ private void checkIdleSlot() {
 		final FlinkException cause = new FlinkException("Releasing idle slot.");
 
 		for (AllocatedSlot expiredSlot : expiredSlots) {
-			final AllocationID allocationID = expiredSlot.getAllocationId();
-			if (availableSlots.tryRemove(allocationID) != null) {
-
-				log.info("Releasing idle slot [{}].", allocationID);
-				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
-					allocationID,
-					cause,
-					rpcTimeout);
-
-				FutureUtils.whenCompleteAsyncIfNotDone(
-					freeSlotFuture,
-					componentMainThreadExecutor,
-					(Acknowledge ignored, Throwable throwable) -> {
-						if (throwable != null) {
-							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
-										"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
-									throwable);
-								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
-							} else {
-								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
-									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
-							}
-						}
-					});
+
+			if (availableSlots.tryRemove(expiredSlot.getAllocationId()) != null) {
+				releaseSlotToTaskManager(expiredSlot, cause);
 			}
 		}
 
 		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
 	}
 
+	private void releaseSlotToTaskManager(AllocatedSlot expiredSlot, FlinkException cause) {
+		final AllocationID allocationID = expiredSlot.getAllocationId();
+		log.info("Releasing idle slot [{}].", allocationID);
+
+		final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
+				allocationID,
+				cause,
+				rpcTimeout);
+
+		FutureUtils.whenCompleteAsyncIfNotDone(
+				freeSlotFuture,
+				componentMainThreadExecutor,
+				(Acknowledge ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						if (throwable instanceof TimeoutException) {
+							log.debug("Releasing slot [{}] of registered TaskExecutor {} timeout. " +
+											"Trying to release it again.",
+									allocationID, expiredSlot.getTaskManagerId(), throwable);
+							releaseSlotToTaskManager(expiredSlot, cause);
 
 Review comment:
   Hi @tillrohrmann , thanks for reviewing it. Only adding a slot report between JM and TM seems can not fix the problems. It only solve the part how JM notice TM that a slot it will not use any more if the freeSlot request timeout. But it may happen that TM has released the slot but the request in Jm timeout. So JM may assign it to another execution, but RM may assign it to someone else. The conflict between JM and Tm can not be solved by slot report. 
   
   I think one solution is that we could enhance the check in my fix, when the freeSlot request fails, if the taskManager not exist, it will drop the slot. else if TimeoutException, it will retry. if SlotNotFoundException, it will drop the slot. if other exception such as ConnectClosed, it will tryFulfillSlotRequestOrMakeAvailable. 
   
   Adding TaskSubmissionException together with slot report may could also solve the problem. I think this is question which behavior we choose when a request timeout in distributed system. One principal is that we retry until get a clear answer, another principal could be that we sync state in a background thread to reach non-conflict.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services