You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2019/01/04 16:36:42 UTC
[flink] branch master updated: [FLINK-10848][YARN] properly remove
YARN ContainerRequest upon container allocation success
This is an automated email from the ASF dual-hosted git repository.
shuyichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e26d90f [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
e26d90f is described below
commit e26d90fc86b266978b4bac84fe02ca34b62983fe
Author: Shuyi Chen <sh...@uber.com>
AuthorDate: Sat Nov 10 00:42:49 2018 -0800
[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
This closes #7078
---
.../src/test/java/org/apache/flink/yarn/YarnTestBase.java | 2 ++
.../java/org/apache/flink/yarn/YarnFlinkResourceManager.java | 2 ++
.../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++-
.../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++
.../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++
5 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f65..f1e6a3a 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -165,6 +165,8 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
+ YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
+ "org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bb..3327505 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
+ resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+ container.getResource(), null, null, container.getPriority()));
// decide whether to return the container, or whether to start a TaskManager
if (numRegistered + containersInLaunch.size() < numRequired) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 6ff5cd6..6669f16 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -361,7 +361,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);
-
+ resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+ container.getResource(), null, null, container.getPriority()));
if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce9..d665df6 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
1),
i));
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
+ when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+ when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
containerList.add(mockContainer);
}
@@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
+ verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest(
+ any(AMRMClient.ContainerRequest.class));
assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
} finally {
if (resourceManager != null) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index d41d42d..ee325da 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -401,6 +401,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+ verify(mockResourceManagerClient).removeContainerRequest(
+ any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
// Remote task executor registers with YarnResourceManager.
@@ -496,6 +498,8 @@ public class YarnResourceManagerTest extends TestLogger {
resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+ verify(mockResourceManagerClient).removeContainerRequest(
+ any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
// Callback from YARN when container is Completed, pending request can not be fulfilled by pending