You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/09/21 07:41:27 UTC
[flink] 04/04: [FLINK-9567][yarn] Before requesting new containers
always check if it is required
This is an automated email from the ASF dual-hosted git repository.
trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 806d3a424c0ad66f49282d0496599597d0f9f0c0
Author: yangshimin <ya...@youzan.com>
AuthorDate: Fri Sep 7 10:07:25 2018 +0800
[FLINK-9567][yarn] Before requesting new containers always check if it is required
By comparing the number of pending slot requests and the number of pending container allocations
it is possible to say whether we should allocate more containers or not.
This closes #6669.
---
.../org/apache/flink/yarn/YarnResourceManager.java | 34 ++++++++++------------
1 file changed, 15 insertions(+), 19 deletions(-)
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index d051a72..6bc955f 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -337,7 +337,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
if (yarnWorkerNode != null) {
// Container completed unexpectedly ~> start a new one
final Container container = yarnWorkerNode.getContainer();
- internalRequestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
+ requestYarnContainer(container.getResource(), yarnWorkerNode.getContainer().getPriority());
}
// Eagerly close the connection with task manager.
closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
@@ -446,17 +446,24 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
return new Tuple2<>(host, Integer.valueOf(port));
}
+ /**
+ * Request new container if pending containers cannot satisfies pending slot requests.
+ */
private void requestYarnContainer(Resource resource, Priority priority) {
- resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
+ int pendingSlotRequests = getNumberPendingSlotRequests();
+ int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
+ if (pendingSlotRequests > pendingSlotAllocation) {
+ resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
- // make sure we transmit the request fast and receive fast news of granted allocations
- resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+ // make sure we transmit the request fast and receive fast news of granted allocations
+ resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
- numPendingContainerRequests++;
+ numPendingContainerRequests++;
- log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
- resource,
- numPendingContainerRequests);
+ log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
+ resource,
+ numPendingContainerRequests);
+ }
}
private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
@@ -513,15 +520,4 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
return priority;
}
}
-
- /**
- * Request new container if pending containers cannot satisfies pending slot requests.
- */
- private void internalRequestYarnContainer(Resource resource, Priority priority) {
- int pendingSlotRequests = getNumberPendingSlotRequests();
- int pendingSlotAllocation = numPendingContainerRequests * numberOfTaskSlots;
- if (pendingSlotRequests > pendingSlotAllocation) {
- requestYarnContainer(resource, priority);
- }
- }
}