You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/07/10 09:36:17 UTC

[flink] 16/16: [hotfix][runtime] Minor cleanup in SlotManager

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit be794ac078360cf82435dca550f8690ca7abdca2
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 9 19:32:15 2019 +0200

    [hotfix][runtime] Minor cleanup in SlotManager
    
    Avoid checking whether slots would be fulfillable unless necessary
---
 .../runtime/resourcemanager/slotmanager/SlotManager.java     | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index bea588b..71f3df6 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -758,13 +758,15 @@ public class SlotManager implements AutoCloseable {
 				pendingTaskManagerSlotOptional = allocateResource(resourceProfile);
 			}
 
-			pendingTaskManagerSlotOptional.ifPresent(pendingTaskManagerSlot -> assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlot));
-			if (!pendingTaskManagerSlotOptional.isPresent()) {
+			if (pendingTaskManagerSlotOptional.isPresent()) {
+				assignPendingTaskManagerSlot(pendingSlotRequest, pendingTaskManagerSlotOptional.get());
+			}
+			else {
 				// request can not be fulfilled by any free slot or pending slot that can be allocated,
 				// check whether it can be fulfilled by allocated slots
-				boolean fulfillable = isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile());
-				if (!fulfillable && failUnfulfillableRequest) {
-					throw new ResourceManagerException("Requested resource profile (" + pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
+				if (failUnfulfillableRequest && !isFulfillableByRegisteredSlots(pendingSlotRequest.getResourceProfile())) {
+					throw new ResourceManagerException("Requested resource profile (" +
+						pendingSlotRequest.getResourceProfile() + ") is unfulfillable.");
 				}
 			}
 		}