You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/08 13:20:13 UTC

[flink] branch release-1.6 updated: Revert "[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success"

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.6 by this push:
     new 8affc90  Revert "[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success"
8affc90 is described below

commit 8affc904d5d3f98fd464b783c30533d0fa516287
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Tue Jan 8 11:25:51 2019 +0100

    Revert "[FLINK-10848][YARN] properly remove YARN ContainerRequest upon container allocation success"
    
    This reverts commit 7cc4c6f3e5e84efc067f2f2179648e31e5defa27.
---
 .../src/test/java/org/apache/flink/yarn/YarnTestBase.java        | 2 --
 .../java/org/apache/flink/yarn/YarnFlinkResourceManager.java     | 2 --
 .../src/main/java/org/apache/flink/yarn/YarnResourceManager.java | 3 +--
 .../java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java | 9 ---------
 .../test/java/org/apache/flink/yarn/YarnResourceManagerTest.java | 4 ----
 5 files changed, 1 insertion(+), 19 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index bcc2980..1a0520f 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -167,8 +167,6 @@ public abstract class YarnTestBase extends TestLogger {
 		YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
 		YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
 		YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
-		YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
-				"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
 		// so we have to change the number of cores for testing.
 		YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
 	}
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 3327505..8e686bb 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -438,8 +438,6 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
 			numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
 			LOG.info("Received new container: {} - Remaining pending container requests: {}",
 				container.getId(), numPendingContainerRequests);
-			resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
-					container.getResource(), null, null, container.getPriority()));
 
 			// decide whether to return the container, or whether to start a TaskManager
 			if (numRegistered + containersInLaunch.size() < numRequired) {
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
index 9d9d21b..ead0ac0 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
@@ -357,8 +357,7 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme
 					"Received new container: {} - Remaining pending container requests: {}",
 					container.getId(),
 					numPendingContainerRequests);
-				resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
-						container.getResource(), null, null, container.getPriority()));
+
 				if (numPendingContainerRequests > 0) {
 					numPendingContainerRequests--;
 
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
index d665df6..10b2ce9 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java
@@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
@@ -71,11 +69,8 @@ import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
 import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
@@ -130,8 +125,6 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 							1),
 						i));
 				when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
-				when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
-				when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 				containerList.add(mockContainer);
 			}
 
@@ -240,8 +233,6 @@ public class YarnFlinkResourceManagerTest extends TestLogger {
 
 				int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());
 
-				verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest(
-						any(AMRMClient.ContainerRequest.class));
 				assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
 			} finally {
 				if (resourceManager != null) {
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 8b58330..58d297d 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -359,8 +359,6 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockResourceManagerClient).removeContainerRequest(
-					any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
 
 			// Remote task executor registers with YarnResourceManager.
@@ -467,8 +465,6 @@ public class YarnResourceManagerTest extends TestLogger {
 			when(testingContainer.getPriority()).thenReturn(Priority.UNDEFINED);
 			resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
 			verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
-			verify(mockResourceManagerClient).removeContainerRequest(
-					any(AMRMClient.ContainerRequest.class));
 			verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));
 
 			// Callback from YARN when container is Completed, pending request can not be fulfilled by pending