You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/12/13 14:34:44 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #10555: [FLINK-15013] Fix non local slot selection and root slot resolution

tillrohrmann commented on a change in pull request #10555: [FLINK-15013] Fix non local slot selection and root slot resolution
URL: https://github.com/apache/flink/pull/10555#discussion_r357673281
 
 

 ##########
 File path: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java
 ##########
 @@ -143,41 +144,44 @@ MultiTaskSlot createRootSlot(
 			SlotRequestId slotRequestId,
 			CompletableFuture<? extends SlotContext> slotContextFuture,
 			SlotRequestId allocatedSlotRequestId) {
+		final CompletableFuture<SlotContext> slotContextFutureAfterRootSlotResolution = new CompletableFuture<>();
 		final MultiTaskSlot rootMultiTaskSlot = new MultiTaskSlot(
 			slotRequestId,
-			slotContextFuture,
+			slotContextFutureAfterRootSlotResolution,
 			allocatedSlotRequestId);
 
 		LOG.debug("Create multi task slot [{}] in slot [{}].", slotRequestId, allocatedSlotRequestId);
 
 		allTaskSlots.put(slotRequestId, rootMultiTaskSlot);
-
 		unresolvedRootSlots.put(slotRequestId, rootMultiTaskSlot);
 
-		// add the root node to the set of resolved root nodes once the SlotContext future has
-		// been completed and we know the slot's TaskManagerLocation
-		slotContextFuture.whenComplete(
-			(SlotContext slotContext, Throwable throwable) -> {
-				if (slotContext != null) {
-					final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
+		FutureUtils.forward(
+			slotContextFuture.thenApply(
+				(SlotContext slotContext) -> {
+					// add the root node to the set of resolved root nodes once the SlotContext future has
+					// been completed and we know the slot's TaskManagerLocation
+					tryMarkSlotAsResolved(slotRequestId, slotContext);
+					return slotContext;
+				}),
+			slotContextFutureAfterRootSlotResolution);
 
-					if (resolvedRootNode != null) {
-						final AllocationID allocationId = slotContext.getAllocationId();
-						LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, allocationId);
+		return rootMultiTaskSlot;
+	}
 
-						final Map<AllocationID, MultiTaskSlot> innerMap = resolvedRootSlots.computeIfAbsent(
-							slotContext.getTaskManagerLocation(),
-							taskManagerLocation -> new HashMap<>(4));
+	private void tryMarkSlotAsResolved(SlotRequestId slotRequestId, SlotInfo slotInfo) {
+		final MultiTaskSlot resolvedRootNode = unresolvedRootSlots.remove(slotRequestId);
 
-						MultiTaskSlot previousValue = innerMap.put(allocationId, resolvedRootNode);
-						Preconditions.checkState(previousValue == null);
-					}
-				} else {
-					rootMultiTaskSlot.release(throwable);
-				}
-			});
+		if (resolvedRootNode != null) {
+			final AllocationID allocationId = slotInfo.getAllocationId();
+			LOG.trace("Fulfill multi task slot [{}] with slot [{}].", slotRequestId, allocationId);
 
-		return rootMultiTaskSlot;
+			final Map<AllocationID, MultiTaskSlot> innerMap = resolvedRootSlots.computeIfAbsent(
+				slotInfo.getTaskManagerLocation(),
+				taskManagerLocation -> new HashMap<>(4));
 
 Review comment:
   I think there is no benefit and according to the coding guidelines this should be avoided. I will add another commit to remove it.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services