You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/12/04 14:48:44 UTC

[GitHub] [flink] tillrohrmann commented on a change in pull request #13879: [FLINK-19832][coordination] Do not schedule shared slot bulk if some slots have failed immediately

tillrohrmann commented on a change in pull request #13879:
URL: https://github.com/apache/flink/pull/13879#discussion_r536139471



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throw
 			ExecutionSlotSharingGroup group = entry.getKey();
 			List<ExecutionVertexID> executionIds = entry.getValue();
 			SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever);
-
-			for (ExecutionVertexID executionId : executionIds) {
-				CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-				SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-				assignments.put(executionId, assignment);
-			}
+			allocateLogicalSlotsFromSharedSlot(assignments, executionIds, sharedSlot);
 		}
 
 		return assignments;
 	}
 
 	private SharedSlot getOrAllocateSharedSlot(
-			ExecutionSlotSharingGroup executionSlotSharingGroup,
+			ExecutionSlotSharingGroup slotSharingGroup,
 			SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-		return sharedSlots
-			.computeIfAbsent(executionSlotSharingGroup, group -> {
-				SlotRequestId physicalSlotRequestId = new SlotRequestId();
-				ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
-				SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-				PhysicalSlotRequest physicalSlotRequest =
-					new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
-				CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
+		SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+		if (sharedSlot == null) {
+			SlotRequestId physicalSlotRequestId = new SlotRequestId();
+			ResourceProfile physicalSlotResourceProfile =
+					getPhysicalSlotResourceProfile(slotSharingGroup);
+			SlotProfile slotProfile = sharedSlotProfileRetriever
+					.getSlotProfile(slotSharingGroup, physicalSlotResourceProfile);
+			PhysicalSlotRequest physicalSlotRequest = new PhysicalSlotRequest(
+					physicalSlotRequestId,
+					slotProfile,
+					slotWillBeOccupiedIndefinitely);
+			CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
 					.allocatePhysicalSlot(physicalSlotRequest)
 					.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-				return new SharedSlot(
+			sharedSlot = new SharedSlot(
 					physicalSlotRequestId,
 					physicalSlotResourceProfile,
-					group,
+					slotSharingGroup,
 					physicalSlotFuture,
 					slotWillBeOccupiedIndefinitely,
 					this::releaseSharedSlot);
-			});
+			if (!physicalSlotFuture.isCompletedExceptionally()) {
+				sharedSlots.put(slotSharingGroup, sharedSlot);
+			}
+		}
+		return sharedSlot;
+	}
+
+	private static void allocateLogicalSlotsFromSharedSlot(
+			Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments,
+			Iterable<ExecutionVertexID> executionIds,
+			SharedSlot sharedSlot) {
+		for (ExecutionVertexID executionId : executionIds) {
+			boolean physicalSlotHasAlreadyFailed = sharedSlot
+					.getSlotContextFuture()
+					.isCompletedExceptionally();

Review comment:
       we should be able to move this out of the for loop

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throw
 			ExecutionSlotSharingGroup group = entry.getKey();
 			List<ExecutionVertexID> executionIds = entry.getValue();
 			SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever);
-
-			for (ExecutionVertexID executionId : executionIds) {
-				CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-				SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-				assignments.put(executionId, assignment);
-			}
+			allocateLogicalSlotsFromSharedSlot(assignments, executionIds, sharedSlot);
 		}
 
 		return assignments;
 	}
 
 	private SharedSlot getOrAllocateSharedSlot(
-			ExecutionSlotSharingGroup executionSlotSharingGroup,
+			ExecutionSlotSharingGroup slotSharingGroup,
 			SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-		return sharedSlots
-			.computeIfAbsent(executionSlotSharingGroup, group -> {
-				SlotRequestId physicalSlotRequestId = new SlotRequestId();
-				ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
-				SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-				PhysicalSlotRequest physicalSlotRequest =
-					new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
-				CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
+		SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+		if (sharedSlot == null) {
+			SlotRequestId physicalSlotRequestId = new SlotRequestId();
+			ResourceProfile physicalSlotResourceProfile =
+					getPhysicalSlotResourceProfile(slotSharingGroup);
+			SlotProfile slotProfile = sharedSlotProfileRetriever
+					.getSlotProfile(slotSharingGroup, physicalSlotResourceProfile);
+			PhysicalSlotRequest physicalSlotRequest = new PhysicalSlotRequest(
+					physicalSlotRequestId,
+					slotProfile,
+					slotWillBeOccupiedIndefinitely);
+			CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
 					.allocatePhysicalSlot(physicalSlotRequest)
 					.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-				return new SharedSlot(
+			sharedSlot = new SharedSlot(
 					physicalSlotRequestId,
 					physicalSlotResourceProfile,
-					group,
+					slotSharingGroup,
 					physicalSlotFuture,
 					slotWillBeOccupiedIndefinitely,
 					this::releaseSharedSlot);
-			});
+			if (!physicalSlotFuture.isCompletedExceptionally()) {
+				sharedSlots.put(slotSharingGroup, sharedSlot);
+			}
+		}
+		return sharedSlot;
+	}
+
+	private static void allocateLogicalSlotsFromSharedSlot(
+			Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments,
+			Iterable<ExecutionVertexID> executionIds,
+			SharedSlot sharedSlot) {
+		for (ExecutionVertexID executionId : executionIds) {
+			boolean physicalSlotHasAlreadyFailed = sharedSlot
+					.getSlotContextFuture()
+					.isCompletedExceptionally();
+			CompletableFuture<LogicalSlot> logicalSlotFuture = physicalSlotHasAlreadyFailed ?
+					sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+					sharedSlot.allocateLogicalSlot(executionId);
+			SlotExecutionVertexAssignment assignment =
+					new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
+			assignments.put(executionId, assignment);

Review comment:
       I am not a huge fan of returning values through a parameter. I think returning the assignments can also work here.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -213,20 +235,24 @@ private ResourceProfile getPhysicalSlotResourceProfile(ExecutionSlotSharingGroup
 			.reduce(ResourceProfile.ZERO, (r, e) -> r.merge(resourceProfileRetriever.apply(e)), ResourceProfile::merge);
 	}
 
-	private SharingPhysicalSlotRequestBulk createBulk(Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
-		Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = executions
-			.keySet()
-			.stream()
-			.collect(Collectors.toMap(
-				group -> group,
-				group -> sharedSlots.get(group).getPhysicalSlotResourceProfile()
-			));
+	private Optional<SharingPhysicalSlotRequestBulk> createBulk(
+			Map<ExecutionSlotSharingGroup, List<ExecutionVertexID>> executions) {
+		Map<ExecutionSlotSharingGroup, ResourceProfile> pendingRequests = new HashMap<>();
+		for (ExecutionSlotSharingGroup group : executions.keySet()) {
+			SharedSlot sharedSlot = sharedSlots.get(group);
+			if (sharedSlot == null || sharedSlot.getSlotContextFuture().isCompletedExceptionally()) {
+				// there is no shared slot for this group or its physical slot has already failed
+				// hence there is no point to track the whole bulk
+				return Optional.empty();
+			}

Review comment:
       Here it is a bit the same with the special case handling. What exactly makes the cases 1) directly failed physical slot future and 2) failing just after this method completes different?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throw
 			ExecutionSlotSharingGroup group = entry.getKey();
 			List<ExecutionVertexID> executionIds = entry.getValue();
 			SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever);
-
-			for (ExecutionVertexID executionId : executionIds) {
-				CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-				SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-				assignments.put(executionId, assignment);
-			}
+			allocateLogicalSlotsFromSharedSlot(assignments, executionIds, sharedSlot);
 		}
 
 		return assignments;
 	}
 
 	private SharedSlot getOrAllocateSharedSlot(
-			ExecutionSlotSharingGroup executionSlotSharingGroup,
+			ExecutionSlotSharingGroup slotSharingGroup,
 			SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-		return sharedSlots
-			.computeIfAbsent(executionSlotSharingGroup, group -> {
-				SlotRequestId physicalSlotRequestId = new SlotRequestId();
-				ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
-				SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-				PhysicalSlotRequest physicalSlotRequest =
-					new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
-				CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
+		SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+		if (sharedSlot == null) {
+			SlotRequestId physicalSlotRequestId = new SlotRequestId();
+			ResourceProfile physicalSlotResourceProfile =
+					getPhysicalSlotResourceProfile(slotSharingGroup);
+			SlotProfile slotProfile = sharedSlotProfileRetriever
+					.getSlotProfile(slotSharingGroup, physicalSlotResourceProfile);
+			PhysicalSlotRequest physicalSlotRequest = new PhysicalSlotRequest(
+					physicalSlotRequestId,
+					slotProfile,
+					slotWillBeOccupiedIndefinitely);
+			CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
 					.allocatePhysicalSlot(physicalSlotRequest)
 					.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-				return new SharedSlot(
+			sharedSlot = new SharedSlot(
 					physicalSlotRequestId,
 					physicalSlotResourceProfile,
-					group,
+					slotSharingGroup,
 					physicalSlotFuture,
 					slotWillBeOccupiedIndefinitely,
 					this::releaseSharedSlot);
-			});
+			if (!physicalSlotFuture.isCompletedExceptionally()) {
+				sharedSlots.put(slotSharingGroup, sharedSlot);
+			}

Review comment:
       Hmm, this looks a bit like special case handling. I guess it would be bit nicer if the case that it is completed exceptionally now and at a later point would be handled the same way. What exactly is different from a `physicalSlotFuture` being completed exceptionally right away and the `phyiscalSlotFuture` being completed exceptionally just after the `SharedSlot` has been created?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
##########
@@ -160,38 +162,58 @@ private void cancelLogicalSlotRequest(ExecutionVertexID executionVertexId, Throw
 			ExecutionSlotSharingGroup group = entry.getKey();
 			List<ExecutionVertexID> executionIds = entry.getValue();
 			SharedSlot sharedSlot = getOrAllocateSharedSlot(group, sharedSlotProfileRetriever);
-
-			for (ExecutionVertexID executionId : executionIds) {
-				CompletableFuture<LogicalSlot> logicalSlotFuture = sharedSlot.allocateLogicalSlot(executionId);
-				SlotExecutionVertexAssignment assignment = new SlotExecutionVertexAssignment(executionId, logicalSlotFuture);
-				assignments.put(executionId, assignment);
-			}
+			allocateLogicalSlotsFromSharedSlot(assignments, executionIds, sharedSlot);
 		}
 
 		return assignments;
 	}
 
 	private SharedSlot getOrAllocateSharedSlot(
-			ExecutionSlotSharingGroup executionSlotSharingGroup,
+			ExecutionSlotSharingGroup slotSharingGroup,
 			SharedSlotProfileRetriever sharedSlotProfileRetriever) {
-		return sharedSlots
-			.computeIfAbsent(executionSlotSharingGroup, group -> {
-				SlotRequestId physicalSlotRequestId = new SlotRequestId();
-				ResourceProfile physicalSlotResourceProfile = getPhysicalSlotResourceProfile(group);
-				SlotProfile slotProfile = sharedSlotProfileRetriever.getSlotProfile(group, physicalSlotResourceProfile);
-				PhysicalSlotRequest physicalSlotRequest =
-					new PhysicalSlotRequest(physicalSlotRequestId, slotProfile, slotWillBeOccupiedIndefinitely);
-				CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
+		SharedSlot sharedSlot = sharedSlots.get(slotSharingGroup);
+		if (sharedSlot == null) {
+			SlotRequestId physicalSlotRequestId = new SlotRequestId();
+			ResourceProfile physicalSlotResourceProfile =
+					getPhysicalSlotResourceProfile(slotSharingGroup);
+			SlotProfile slotProfile = sharedSlotProfileRetriever
+					.getSlotProfile(slotSharingGroup, physicalSlotResourceProfile);
+			PhysicalSlotRequest physicalSlotRequest = new PhysicalSlotRequest(
+					physicalSlotRequestId,
+					slotProfile,
+					slotWillBeOccupiedIndefinitely);
+			CompletableFuture<PhysicalSlot> physicalSlotFuture = slotProvider
 					.allocatePhysicalSlot(physicalSlotRequest)
 					.thenApply(PhysicalSlotRequest.Result::getPhysicalSlot);
-				return new SharedSlot(
+			sharedSlot = new SharedSlot(
 					physicalSlotRequestId,
 					physicalSlotResourceProfile,
-					group,
+					slotSharingGroup,
 					physicalSlotFuture,
 					slotWillBeOccupiedIndefinitely,
 					this::releaseSharedSlot);
-			});
+			if (!physicalSlotFuture.isCompletedExceptionally()) {
+				sharedSlots.put(slotSharingGroup, sharedSlot);
+			}
+		}
+		return sharedSlot;
+	}
+
+	private static void allocateLogicalSlotsFromSharedSlot(
+			Map<ExecutionVertexID, SlotExecutionVertexAssignment> assignments,
+			Iterable<ExecutionVertexID> executionIds,
+			SharedSlot sharedSlot) {
+		for (ExecutionVertexID executionId : executionIds) {
+			boolean physicalSlotHasAlreadyFailed = sharedSlot
+					.getSlotContextFuture()
+					.isCompletedExceptionally();
+			CompletableFuture<LogicalSlot> logicalSlotFuture = physicalSlotHasAlreadyFailed ?
+					sharedSlot.getSlotContextFuture().thenApply(stub -> null) :
+					sharedSlot.allocateLogicalSlot(executionId);

Review comment:
       can't `SharedSlot.allocateLogicalSlot` return a failed future if the internal future has already failed? That way, we wouldn't need the special casing here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org