You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by sh...@apache.org on 2019/01/05 08:47:27 UTC

[flink] branch release-1.6 updated: [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success

This is an automated email from the ASF dual-hosted git repository.

shuyichen pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 7cc4c6f  [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
7cc4c6f is described below

commit 7cc4c6f3e5e84efc067f2f2179648e31e5defa27
Author: Shuyi Chen <sh...@uber.com>
AuthorDate: Sat Nov 10 00:42:49 2018 -0800

    [FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success
    
    This closes #7078
---
 .../src/test/java/org/apache/flink/yarn/YarnTestBase.java        | 2 ++
 .../java/org/apache/flink/yarn/YarnFlinkResourceManager.java     | 2 ++
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 ++-
 .../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 +++++++++
 .../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ++++
 5 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f..bcc2980 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -167,6 +167,8 @@ public abstract class YarnTestBase extends TestLogger {
 		YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
 		YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
 		YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
+		YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
+				"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
 		// so we have to change the number of cores for testing.
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 8e686bb..3327505 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,6 +438,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
 			LOG.info("Received new container: {} - Remaining pending container requests: {}",
 				container.getId(), numPendingContainerRequests);
+			resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+					container.getResource(), null, null, container.getPriority()));
 
 			// decide whether to return the container, or whether to start a TaskManager
 			if (numRegistered + containersInLaunch.size() < numRequired) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index ead0ac0..9d9d21b 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -357,7 +357,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					"Received new container: {} - Remaining pending container requests: {}",
 					container.getId(),
 					numPendingContainerRequests);
-
+				resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
+						container.getResource(), null, null, container.getPriority()));
 				if (numPendingContainerRequests > 0) {
 					numPendingContainerRequests--;
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index 10b2ce9..d665df6 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -69,8 +71,11 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -125,6 +130,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 							1),
 						i));
 				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
+				when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
+				when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 				containerList.add(mockContainer);
 			}
 
@@ -233,6 +240,8 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 
 				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
 
+				verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest(
+						any(AMRMClient.ContainerRequest.class));
 				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
 			} finally {
 				if (resourceManager != null) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 58d297d..8b58330 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -359,6 +359,8 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+			verify(mockResourceManagerClient).removeContainerRequest(
+					any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
 
 			// Remote task executor registers with YarnResourceManager.
@@ -465,6 +467,8 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+			verify(mockResourceManagerClient).removeContainerRequest(
+					any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
 
 			// Callback from YARN when container is Completed, pending request can not be fulfilled by pending