You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/06/04 09:29:11 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots

tillrohrmann commented on a change in pull request #7227: [FLINK-11059] [runtime] do not add releasing failed slot to free slots
URL: https://github.com/apache/flink/pull/7227#discussion_r290207018
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##########
 @@ -759,37 +759,43 @@ private void checkIdleSlot() {
 		final FlinkException cause = new FlinkException("Releasing idle slot.");
 
 		for (AllocatedSlot expiredSlot : expiredSlots) {
-			final AllocationID allocationID = expiredSlot.getAllocationId();
-			if (availableSlots.tryRemove(allocationID) != null) {
-
-				log.info("Releasing idle slot [{}].", allocationID);
-				final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
-					allocationID,
-					cause,
-					rpcTimeout);
-
-				FutureUtils.whenCompleteAsyncIfNotDone(
-					freeSlotFuture,
-					componentMainThreadExecutor,
-					(Acknowledge ignored, Throwable throwable) -> {
-						if (throwable != null) {
-							if (registeredTaskManagers.contains(expiredSlot.getTaskManagerId())) {
-								log.debug("Releasing slot [{}] of registered TaskExecutor {} failed. " +
-										"Trying to fulfill a different slot request.", allocationID, expiredSlot.getTaskManagerId(),
-									throwable);
-								tryFulfillSlotRequestOrMakeAvailable(expiredSlot);
-							} else {
-								log.debug("Releasing slot [{}] failed and owning TaskExecutor {} is no " +
-									"longer registered. Discarding slot.", allocationID, expiredSlot.getTaskManagerId());
-							}
-						}
-					});
+
+			if (availableSlots.tryRemove(expiredSlot.getAllocationId()) != null) {
+				releaseSlotToTaskManager(expiredSlot, cause);
 			}
 		}
 
 		scheduleRunAsync(this::checkIdleSlot, idleSlotTimeout);
 	}
 
+	private void releaseSlotToTaskManager(AllocatedSlot expiredSlot, FlinkException cause) {
+		final AllocationID allocationID = expiredSlot.getAllocationId();
+		log.info("Releasing idle slot [{}].", allocationID);
+
+		final CompletableFuture<Acknowledge> freeSlotFuture = expiredSlot.getTaskManagerGateway().freeSlot(
+				allocationID,
+				cause,
+				rpcTimeout);
+
+		FutureUtils.whenCompleteAsyncIfNotDone(
+				freeSlotFuture,
+				componentMainThreadExecutor,
+				(Acknowledge ignored, Throwable throwable) -> {
+					if (throwable != null) {
+						if (throwable instanceof TimeoutException) {
+							log.debug("Releasing slot [{}] of registered TaskExecutor {} timeout. " +
+											"Trying to release it again.",
+									allocationID, expiredSlot.getTaskManagerId(), throwable);
+							releaseSlotToTaskManager(expiredSlot, cause);
 
 Review comment:
   I think you are not right @shuai-xu. Based on the heartbeat from the JM to the TM, the TM could free all slots which the TM thinks are still assigned to the JM and fail those which the JM thinks it still owns but does not (via `JobMasterGateway#failSlot`). Alternatively, the TM could send its own slot report back to the JM as the heartbeat's payload so that both sides can reconcile.
   
   I think in general it is not a good idea to retry things forever in distributed systems because you never know how long an outage may last. It could also be the case that only the `freeSlot` messages get lost for whatever reason. Instead the far better approach is to retry it for some time/attempts and then have another means to converge to a consistent view over the system state (e.g. by receiving a special exception saying that one does not own the slot anymore or having a periodic heartbeat which allows you to reconcile the state).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services