You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2020/04/20 11:53:46 UTC

[GitHub] [flink] GJL commented on a change in pull request #11615: [FLINK-16605] Add max limitation to the total number of slots

GJL commented on a change in pull request #11615:
URL: https://github.com/apache/flink/pull/11615#discussion_r411236972



##########
File path: flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
##########
@@ -76,6 +77,16 @@
 		.withDeprecatedKeys("yarn.heap-cutoff-min")
 		.withDescription("Minimum amount of heap memory to remove in Job Master containers, as a safety margin.");
 
+	@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+	public static final ConfigOption<Integer> MAX_SLOT_NUM = ConfigOptions
+		.key("slotmanager.number-of-slots.max")
+		.intType()
+		.defaultValue(Integer.MAX_VALUE)
+		.withDescription("Defines the maximum number of slots that the Flink cluster allocates. This configuration option " +
+			"is meant for limiting the memory consumption for batch workloads. It is not recommended to configure this option " +

Review comment:
       Maybe _memory consumption_ -> _resource consumption_?

##########
File path: docs/_includes/generated/expert_scheduling_section.html
##########
@@ -14,6 +14,12 @@
             <td>Boolean</td>
             <td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <span markdown="span">`TaskExecutors`</span>.</td>
         </tr>
+        <tr>
+            <td><h5>slotmanager.number-of-slots.max</h5></td>

Review comment:
       I do not think that `max` should be part of a hierarchy but rather part of the property itself, i.e., `max-number-of-slots`. A quick search in the config docs shows that there are more occurrences of the `max-xzy` pattern: https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/config.html
   
   WDYT?
   
   cc: @xintongsong 

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManagerFactory.java
##########
@@ -74,4 +81,21 @@
 			standaloneClusterStartupPeriodTime,
 			AkkaUtils.getTimeoutAsTime(configuration));
 	}
+
+	/**
+	 * Get the configuration for standalone ResourceManager, overwrite invalid configs.
+	 *
+	 * @param configuration configuration object
+	 * @return the configuration for standalone ResourceManager
+	 */
+	public static Configuration getConfigurationForStandaloneResourceManager(Configuration configuration) {

Review comment:
       Is `public` access needed?

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,52 @@ private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, Resource
 	@Nullable
 	private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
 		for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
-			if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+			if (pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) {
 				return pendingTaskManagerSlot;
 			}
 		}
 
 		return null;
 	}
 
+	private boolean pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+		return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+	}
+
+	private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) {
+		final int numReportedNewSlots = slotReport.getNumSlotStatus();
+		final int numRegisteredSlots =  getNumberRegisteredSlots();
+		final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+		// check if the total number exceed before matching pending slot.
+		if (numRegisteredSlots + numPendingSlots + numReportedNewSlots <= maxSlotNum) {
+			return false;
+		}
+
+		// check how many exceed slots could be consumed by pending slot.
+		final int totalSlotNum = numRegisteredSlots + numPendingSlots + getNumNonPendingReportedNewSlots(slotReport, numReportedNewSlots);
+		if (totalSlotNum > maxSlotNum) {
+			return true;
+		}
+
+		return false;

Review comment:
       ```suggestion
   		return totalSlotNum > maxSlotNum;
   ```

##########
File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -646,14 +658,52 @@ private TaskManagerSlot createAndRegisterTaskManagerSlot(SlotID slotId, Resource
 	@Nullable
 	private PendingTaskManagerSlot findExactlyMatchingPendingTaskManagerSlot(ResourceProfile resourceProfile) {
 		for (PendingTaskManagerSlot pendingTaskManagerSlot : pendingSlots.values()) {
-			if (pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile)) {
+			if (pendingSlotExactlyMatchingResourceProfile(pendingTaskManagerSlot, resourceProfile)) {
 				return pendingTaskManagerSlot;
 			}
 		}
 
 		return null;
 	}
 
+	private boolean pendingSlotExactlyMatchingResourceProfile(PendingTaskManagerSlot pendingTaskManagerSlot, ResourceProfile resourceProfile) {
+		return pendingTaskManagerSlot.getResourceProfile().equals(resourceProfile);
+	}
+
+	private boolean totalSlotNumExceedMaxLimitAfterRegister(SlotReport slotReport) {
+		final int numReportedNewSlots = slotReport.getNumSlotStatus();
+		final int numRegisteredSlots =  getNumberRegisteredSlots();
+		final int numPendingSlots = getNumberPendingTaskManagerSlots();
+
+		// check if the total number exceed before matching pending slot.
+		if (numRegisteredSlots + numPendingSlots + numReportedNewSlots <= maxSlotNum) {
+			return false;
+		}
+
+		// check how many exceed slots could be consumed by pending slot.
+		final int totalSlotNum = numRegisteredSlots + numPendingSlots + getNumNonPendingReportedNewSlots(slotReport, numReportedNewSlots);
+		if (totalSlotNum > maxSlotNum) {
+			return true;
+		}
+
+		return false;
+	}
+
+	private int getNumNonPendingReportedNewSlots(SlotReport slotReport, int numReportedNewSlots) {

Review comment:
       Is `numReportedNewSlots` always the same as `slotReport.getNumSlotStatus()`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org