You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ey...@apache.org on 2018/07/27 00:13:06 UTC
[43/50] hadoop git commit: YARN-8546. Resource leak caused by a
reserved container being released more than once under async scheduling.
Contributed by Tao Yang.
YARN-8546. Resource leak caused by a reserved container being released more than once under async scheduling. Contributed by Tao Yang.
(Cherry-picked from commit 5be9f4a5d05c9cb99348719fe35626b1de3055db)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b89624a9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b89624a9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b89624a9
Branch: refs/remotes/origin/branch-3.1
Commit: b89624a943268e180e0e1532b3a394ff580a962c
Parents: 1396fa2
Author: Weiwei Yang <ww...@apache.org>
Authored: Wed Jul 25 17:35:27 2018 +0800
Committer: Weiwei Yang <ww...@apache.org>
Committed: Wed Jul 25 17:53:40 2018 +0800
----------------------------------------------------------------------
.../scheduler/common/fica/FiCaSchedulerApp.java | 15 ++++
.../TestCapacitySchedulerAsyncScheduling.java | 89 ++++++++++++++++++++
2 files changed, 104 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b89624a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 3b1b82c..9810e98 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -361,6 +361,21 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
.isEmpty()) {
for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
releaseContainer : allocation.getToRelease()) {
+ // Make sure to-release reserved containers are not outdated
+ if (releaseContainer.getRmContainer().getState()
+ == RMContainerState.RESERVED
+ && releaseContainer.getRmContainer() != releaseContainer
+ .getSchedulerNode().getReservedContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to accept this proposal because "
+ + "it tries to release an outdated reserved container "
+ + releaseContainer.getRmContainer().getContainerId()
+ + " on node " + releaseContainer.getSchedulerNode().getNodeID()
+ + " whose reserved container is "
+ + releaseContainer.getSchedulerNode().getReservedContainer());
+ }
+ return false;
+ }
// Only consider non-reserved container (reserved container will
// not affect available resource of node) on the same node
if (releaseContainer.getRmContainer().getState()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b89624a9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
index 338b9f9..c2c1519 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -685,6 +687,93 @@ public class TestCapacitySchedulerAsyncScheduling {
rm.stop();
}
+
+ @Test(timeout = 60000)
+ public void testReleaseOutdatedReservedContainer() throws Exception {
+ /*
+ * Submit a application, reserved container_02 on nm1,
+ * submit two allocate proposals which contain the same reserved
+ * container_02 as to-released container.
+ * First proposal should be accepted, second proposal should be rejected
+ * because it try to release an outdated reserved container
+ */
+ MockRM rm1 = new MockRM();
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+ MockNM nm3 = rm1.registerNode("h3:1234", 8 * GB);
+ rm1.drainEvents();
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ LeafQueue defaultQueue = (LeafQueue) cs.getQueue("default");
+ SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+ SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
+ SchedulerNode sn3 = cs.getSchedulerNode(nm3.getNodeId());
+
+ // launch another app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(4 * GB, "app", "user", null, "default");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ Resource allocateResource = Resources.createResource(5 * GB);
+ am1.allocate("*", (int) allocateResource.getMemorySize(), 3, 0,
+ new ArrayList<ContainerId>(), "");
+ FiCaSchedulerApp schedulerApp1 =
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
+
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
+ Assert.assertEquals(9 * GB,
+ defaultQueue.getQueueResourceUsage().getUsed().getMemorySize());
+
+ RMContainer reservedContainer =
+ schedulerApp1.getReservedContainers().get(0);
+ ResourceCommitRequest allocateFromSameReservedContainerProposal1 =
+ createAllocateFromReservedProposal(3, allocateResource, schedulerApp1,
+ sn2, sn1, cs.getRMContext(), reservedContainer);
+ boolean tryCommitResult = cs.tryCommit(cs.getClusterResource(),
+ allocateFromSameReservedContainerProposal1, true);
+ Assert.assertTrue(tryCommitResult);
+ ResourceCommitRequest allocateFromSameReservedContainerProposal2 =
+ createAllocateFromReservedProposal(4, allocateResource, schedulerApp1,
+ sn3, sn1, cs.getRMContext(), reservedContainer);
+ tryCommitResult = cs.tryCommit(cs.getClusterResource(),
+ allocateFromSameReservedContainerProposal2, true);
+ Assert.assertFalse("This proposal should be rejected because "
+ + "it try to release an outdated reserved container", tryCommitResult);
+
+ rm1.close();
+ }
+
+ private ResourceCommitRequest createAllocateFromReservedProposal(
+ int containerId, Resource allocateResource, FiCaSchedulerApp schedulerApp,
+ SchedulerNode allocateNode, SchedulerNode reservedNode,
+ RMContext rmContext, RMContainer reservedContainer) {
+ Container container = Container.newInstance(
+ ContainerId.newContainerId(schedulerApp.getApplicationAttemptId(), containerId),
+ allocateNode.getNodeID(), allocateNode.getHttpAddress(), allocateResource,
+ Priority.newInstance(0), null);
+ RMContainer rmContainer = new RMContainerImpl(container, SchedulerRequestKey
+ .create(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*", allocateResource, 1)),
+ schedulerApp.getApplicationAttemptId(), allocateNode.getNodeID(), "user",
+ rmContext);
+ SchedulerContainer allocateContainer =
+ new SchedulerContainer(schedulerApp, allocateNode, rmContainer, "", true);
+ SchedulerContainer reservedSchedulerContainer =
+ new SchedulerContainer(schedulerApp, reservedNode, reservedContainer, "",
+ false);
+ List<SchedulerContainer> toRelease = new ArrayList<>();
+ toRelease.add(reservedSchedulerContainer);
+ ContainerAllocationProposal allocateFromReservedProposal =
+ new ContainerAllocationProposal(allocateContainer, toRelease, null,
+ NodeType.OFF_SWITCH, NodeType.OFF_SWITCH,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, allocateResource);
+ List<ContainerAllocationProposal> allocateProposals = new ArrayList<>();
+ allocateProposals.add(allocateFromReservedProposal);
+ return new ResourceCommitRequest(allocateProposals, null, null);
+ }
+
private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
if (nmHeartbeatThread != null) {
nmHeartbeatThread.setShouldStop();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org