You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/24 13:55:02 UTC

[GitHub] [flink] azagrebin commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

azagrebin commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r414556677



##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -74,4 +81,20 @@
 			standaloneClusterStartupPeriodTime,
 			AkkaUtils.getTimeoutAsTime(configuration));
 	}
+
+	/**
+	 * Get the configuration for standalone ResourceManager, overwrite invalid configs.
+	 *
+	 * @param configuration configuration object
+	 * @return the configuration for standalone ResourceManager
+	 */
+	private static Configuration getConfigurationForStandaloneResourceManager(Configuration configuration) {

Review comment:
       ```suggestion
   	private static Configuration removeMaxSlotNumberIfSet(Configuration configuration) {
   ```
   As this is the only thing which is done here now, I would suggest to be more explicit.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -650,14 +658,48 @@ private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, Resource
 	@Nullable
 	private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
 		for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
-			if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+			if (isPendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) {
 				return pendingTaskManagerSlot;
 			}
 		}
 
 		return null;
 	}
 
+	private boolean isPendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+		return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+	}
+
+	private boolean isMaxSlotNumExceededAfterRegistration(SlotReport initialSlotReport) {
+		final int numReportedNewSlots = initialSlotReport.getNumSlotStatus();
+		final int numRegisteredSlots =  getNumberRegisteredSlots();
+		final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+		// check if the total number exceed before matching pending slot.
+		if (numRegisteredSlots + numPendingSlots + numReportedNewSlots <= maxSlotNum) {
+			return false;
+		}
+
+		// check how many exceed slots could be consumed by pending slot.
+		final int totalSlotNum = numRegisteredSlots + numPendingSlots + getNumNonPendingReportedNewSlots(initialSlotReport);
+		return totalSlotNum > maxSlotNum;
+	}
+
+	private int getNumNonPendingReportedNewSlots(SlotReport slotReport) {
+		final Set<TaskManagerSlotId> matchingPendingSlots = new HashSet<>();
+
+		for (SlotStatus slotStatus : slotReport) {
+			for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
+				if (!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId()) &&

Review comment:
       why do we need `!matchingPendingSlots.contains(pendingTaskManagerSlot.getTaskManagerSlotId())`? it does not look like `SlotReport` can contain duplicated `TaskManagerSlotId`.

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -390,6 +392,12 @@ public void registerTaskManager(final TaskExecutorConnection taskExecutorConnect
 		if (taskManagerRegistrations.containsKey(taskExecutorConnection.getInstanceID())) {
 			reportSlotStatus(taskExecutorConnection.getInstanceID(), initialSlotReport);
 		} else {
+			if (isMaxSlotNumExceededAfterRegistration(initialSlotReport)) {
+				LOG.info("The total number of slots exceeds the max limitation {}, release the excess resource.", maxSlotNum);

Review comment:
       If we never allocate more than max in active RMs (`allocateResource`), can this happen?
   Except standalone where we say we do not want this limitation
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org