You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 07:41:27 UTC

[flink] 04/04: [FLINK-9567][yarn] Before requesting new containers always check if it is required

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 806d3a424c0ad66f49282d0496599597d0f9f0c0
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800

    [FLINK-9567][yarn] Before requesting new containers always check if it is required
    
    By comparing the number of pending slot requests and the number of pending container allocations
    it is possible to say whether we should allocate more containers or not.
    
    This closes #6669.
---
 .../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d051a72..6bc955f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -337,7 +337,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					if (yarnWorkerNode != null) {
 						// Container completed unexpectedly ~> start a new one
 						final Container container = yarnWorkerNode.getContainer();
-						internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+						requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
 					}
 					// Eagerly close the connection with task manager.
 					closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -446,17 +446,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 		return new Tuple2<>(host, Integer.valueOf(port));
 	}
 
+	/**
+	 * Request new container if pending containers cannot satisfies pending slot requests.
+	 */
 	private void requestYarnContainer(Resource resource, Priority priority) {
-		resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+		int pendingSlotRequests = getNumberPendingSlotRequests();
+		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+		if (pendingSlotRequests > pendingSlotAllocation) {
+			resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
 
-		// make sure we transmit the request fast and receive fast news of granted allocations
-		resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+			// make sure we transmit the request fast and receive fast news of granted allocations
+			resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
 
-		numPendingContainerRequests++;
+			numPendingContainerRequests++;
 
-		log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
-			resource,
-			numPendingContainerRequests);
+			log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+				resource,
+				numPendingContainerRequests);
+		}
 	}
 
 	private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -513,15 +520,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 			return priority;
 		}
 	}
-
-	/**
-	 * Request new container if pending containers cannot satisfies pending slot requests.
-	 */
-	private void internalRequestYarnContainer(Resource resource, Priority priority) {
-		int pendingSlotRequests = getNumberPendingSlotRequests();
-		int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
-		if (pendingSlotRequests > pendingSlotAllocation) {
-			requestYarnContainer(resource, priority);
-		}
-	}
 }