You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/09/02 07:58:54 UTC
[42/50] [abbrv] hadoop git commit: YARN-4082. Container shouldn't be
killed when node's label updated. Contributed by Wangda Tan.
YARN-4082. Container shouldn't be killed when node's label updated. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bf669b6d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bf669b6d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bf669b6d
Branch: refs/heads/HDFS-7285
Commit: bf669b6d9f8ba165e30b8823218d625a49958925
Parents: f4d96be
Author: Varun Vasudev <vv...@apache.org>
Authored: Tue Sep 1 14:19:11 2015 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Tue Sep 1 14:19:11 2015 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/capacity/AbstractCSQueue.java | 27 ++
.../scheduler/capacity/CSQueue.java | 26 ++
.../scheduler/capacity/CapacityScheduler.java | 40 +--
.../scheduler/capacity/LeafQueue.java | 16 ++
.../scheduler/common/fica/FiCaSchedulerApp.java | 9 +
.../TestCapacitySchedulerNodeLabelUpdate.java | 249 ++++++++++++++++---
7 files changed, 314 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 80cf793..999654d 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -804,6 +804,9 @@ Release 2.8.0 - UNRELEASED
YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id
has not been reset synchronously. (Jun Gong via rohithsharmaks)
+ YARN-4082. Container shouldn't be killed when node's label updated.
+ (Wangda Tan via vvasudev)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 792c25c..0ae4d1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -543,6 +544,32 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
+ @Override
+ public void incUsedResource(String nodeLabel, Resource resourceToInc,
+ SchedulerApplicationAttempt application) {
+ if (nodeLabel == null) {
+ nodeLabel = RMNodeLabelsManager.NO_LABEL;
+ }
+ // ResourceUsage has its own lock, no addition lock needs here.
+ queueUsage.incUsed(nodeLabel, resourceToInc);
+ if (null != parent) {
+ parent.incUsedResource(nodeLabel, resourceToInc, null);
+ }
+ }
+
+ @Override
+ public void decUsedResource(String nodeLabel, Resource resourceToDec,
+ SchedulerApplicationAttempt application) {
+ if (nodeLabel == null) {
+ nodeLabel = RMNodeLabelsManager.NO_LABEL;
+ }
+ // ResourceUsage has its own lock, no addition lock needs here.
+ queueUsage.decUsed(nodeLabel, resourceToDec);
+ if (null != parent) {
+ parent.decUsedResource(nodeLabel, resourceToDec, null);
+ }
+ }
+
/**
* Return if the queue has pending resource on given nodePartition and
* schedulingMode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index b06a646..9855dd4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -287,4 +288,29 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @return resourceUsage
*/
public ResourceUsage getQueueResourceUsage();
+
+ /**
+ * When partition of node updated, we will update queue's resource usage if it
+ * has container(s) running on that.
+ */
+ public void incUsedResource(String nodePartition, Resource resourceToInc,
+ SchedulerApplicationAttempt application);
+
+ /**
+ * When partition of node updated, we will update queue's resource usage if it
+ * has container(s) running on that.
+ */
+ public void decUsedResource(String nodePartition, Resource resourceToDec,
+ SchedulerApplicationAttempt application);
+
+ /**
+ * When an outstanding resource is fulfilled or canceled, calling this will
+ * decrease pending resource in a queue.
+ *
+ * @param nodeLabel
+ * asked by application
+ * @param resourceToDec
+ * new resource asked
+ */
+ public void decPendingResource(String nodeLabel, Resource resourceToDec);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cff1fe5..b5ccbd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1040,12 +1040,6 @@ public class CapacityScheduler extends
/**
* Process node labels update on a node.
- *
- * TODO: Currently capacity scheduler will kill containers on a node when
- * labels on the node changed. It is a simply solution to ensure guaranteed
- * capacity on labels of queues. When YARN-2498 completed, we can let
- * preemption policy to decide if such containers need to be killed or just
- * keep them running.
*/
private synchronized void updateLabelsOnNode(NodeId nodeId,
Set<String> newLabels) {
@@ -1060,17 +1054,31 @@ public class CapacityScheduler extends
return;
}
- // Kill running containers since label is changed
+ // Get new partition, we have only one partition per node
+ String newPartition;
+ if (newLabels.isEmpty()) {
+ newPartition = RMNodeLabelsManager.NO_LABEL;
+ } else {
+ newPartition = newLabels.iterator().next();
+ }
+
+ // old partition as well
+ String oldPartition = node.getPartition();
+
+ // Update resources of these containers
for (RMContainer rmContainer : node.getRunningContainers()) {
- ContainerId containerId = rmContainer.getContainerId();
- completedContainer(rmContainer,
- ContainerStatus.newInstance(containerId,
- ContainerState.COMPLETE,
- String.format(
- "Container=%s killed since labels on the node=%s changed",
- containerId.toString(), nodeId.toString()),
- ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
- RMContainerEventType.KILL);
+ FiCaSchedulerApp application =
+ getApplicationAttempt(rmContainer.getApplicationAttemptId());
+ if (null != application) {
+ application.nodePartitionUpdated(rmContainer, oldPartition,
+ newPartition);
+ } else {
+ LOG.warn("There's something wrong, some RMContainers running on"
+ + " a node, but we cannot find SchedulerApplicationAttempt for it. Node="
+ + node.getNodeID() + " applicationAttemptId="
+ + rmContainer.getApplicationAttemptId());
+ continue;
+ }
}
// Unreserve container on this node
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index ff1baff..658eae1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1262,6 +1262,22 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+ @Override
+ public void incUsedResource(String nodeLabel, Resource resourceToInc,
+ SchedulerApplicationAttempt application) {
+ getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
+ resourceToInc);
+ super.incUsedResource(nodeLabel, resourceToInc, application);
+ }
+
+ @Override
+ public void decUsedResource(String nodeLabel, Resource resourceToDec,
+ SchedulerApplicationAttempt application) {
+ getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
+ resourceToDec);
+ super.decUsedResource(nodeLabel, resourceToDec, application);
+ }
+
@VisibleForTesting
public static class User {
ResourceUsage userResourceUsage = new ResourceUsage();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 74d77f5..300cba9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -443,4 +443,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
schedulingMode, currentResourceLimits, reservedContainer);
}
}
+
+ public void nodePartitionUpdated(RMContainer rmContainer, String oldPartition,
+ String newPartition) {
+ Resource containerResource = rmContainer.getAllocatedResource();
+ this.attemptResourceUsage.decUsed(oldPartition, containerResource);
+ this.attemptResourceUsage.incUsed(newPartition, containerResource);
+ getCSLeafQueue().decUsedResource(oldPartition, containerResource, this);
+ getCSLeafQueue().incUsedResource(newPartition, containerResource, this);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf669b6d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 0a701d8..94af4e0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -19,22 +19,29 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.junit.Assert;
import org.junit.Before;
@@ -97,8 +104,18 @@ public class TestCapacitySchedulerNodeLabelUpdate {
.getMemory());
}
+ private void checkUserUsedResource(MockRM rm, String queueName,
+ String userName, String partition, int memory) {
+ CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
+ LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
+ LeafQueue.User user = queue.getUser(userName);
+ Assert.assertEquals(memory,
+ user.getResourceUsage().getUsed(partition).getMemory());
+ }
+
@Test(timeout = 60000)
- public void testResourceUsage() throws Exception {
+ public void testRequestContainerAfterNodePartitionUpdated()
+ throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y",
"z"));
@@ -160,7 +177,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
}
@Test (timeout = 60000)
- public void testNodeUpdate() throws Exception {
+ public void testResourceUsageWhenNodeUpdatesPartition()
+ throws Exception {
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
@@ -183,8 +201,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
MockNM nm1 = rm.registerNode("h1:1234", 8000);
MockNM nm2 = rm.registerNode("h2:1234", 8000);
MockNM nm3 = rm.registerNode("h3:1234", 8000);
-
- ContainerId containerId;
+
+ ContainerId containerId1;
+ ContainerId containerId2;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
@@ -193,9 +212,9 @@ public class TestCapacitySchedulerNodeLabelUpdate {
// request a container.
am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- Assert.assertTrue(rm.waitForState(nm1, containerId,
+ containerId1 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ containerId2 = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm.waitForState(nm1, containerId2,
RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
@@ -203,55 +222,205 @@ public class TestCapacitySchedulerNodeLabelUpdate {
checkUsedResource(rm, "a", 1024, "x");
checkUsedResource(rm, "a", 1024);
- // change h1's label to z, container should be killed
- mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
- toSet("z")));
- Assert.assertTrue(rm.waitForState(nm1, containerId,
- RMContainerState.KILLED, 10 * 1000));
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ FiCaSchedulerApp app = cs.getApplicationAttempt(am1.getApplicationAttemptId());
- // check used resource:
- // queue-a used x=0G, ""=1G ("" not changed)
+ // change h1's label to z
+ cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+ toSet("z"))));
checkUsedResource(rm, "a", 0, "x");
+ checkUsedResource(rm, "a", 1024, "z");
checkUsedResource(rm, "a", 1024);
+ checkUsedResource(rm, "root", 0, "x");
+ checkUsedResource(rm, "root", 1024, "z");
+ checkUsedResource(rm, "root", 1024);
+ checkUserUsedResource(rm, "a", "user", "x", 0);
+ checkUserUsedResource(rm, "a", "user", "z", 1024);
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+ Assert.assertEquals(1024,
+ app.getAppAttemptResourceUsage().getUsed("z").getMemory());
- // request a container with label = y
- am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "y");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
- Assert.assertTrue(rm.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
-
- // check used resource:
- // queue-a used y=1G, ""=1G
+ // change h1's label to y
+ cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+ toSet("y"))));
+ checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 1024, "y");
+ checkUsedResource(rm, "a", 0, "z");
checkUsedResource(rm, "a", 1024);
+ checkUsedResource(rm, "root", 0, "x");
+ checkUsedResource(rm, "root", 1024, "y");
+ checkUsedResource(rm, "root", 0, "z");
+ checkUsedResource(rm, "root", 1024);
+ checkUserUsedResource(rm, "a", "user", "x", 0);
+ checkUserUsedResource(rm, "a", "user", "y", 1024);
+ checkUserUsedResource(rm, "a", "user", "z", 0);
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+ Assert.assertEquals(1024,
+ app.getAppAttemptResourceUsage().getUsed("y").getMemory());
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("z").getMemory());
- // change h2's label to no label, container should be killed
- mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h2", 0),
- CommonNodeLabelsManager.EMPTY_STRING_SET));
- Assert.assertTrue(rm.waitForState(nm1, containerId,
- RMContainerState.KILLED, 10 * 1000));
+ // change h1's label to no label
+ Set<String> emptyLabels = new HashSet<>();
+ Map<NodeId,Set<String>> map = ImmutableMap.of(nm1.getNodeId(),
+ emptyLabels);
+ cs.handle(new NodeLabelsUpdateSchedulerEvent(map));
+ checkUsedResource(rm, "a", 0, "x");
+ checkUsedResource(rm, "a", 0, "y");
+ checkUsedResource(rm, "a", 0, "z");
+ checkUsedResource(rm, "a", 2048);
+ checkUsedResource(rm, "root", 0, "x");
+ checkUsedResource(rm, "root", 0, "y");
+ checkUsedResource(rm, "root", 0, "z");
+ checkUsedResource(rm, "root", 2048);
+ checkUserUsedResource(rm, "a", "user", "x", 0);
+ checkUserUsedResource(rm, "a", "user", "y", 0);
+ checkUserUsedResource(rm, "a", "user", "z", 0);
+ checkUserUsedResource(rm, "a", "user", "", 2048);
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("x").getMemory());
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("y").getMemory());
+ Assert.assertEquals(0,
+ app.getAppAttemptResourceUsage().getUsed("z").getMemory());
+ Assert.assertEquals(2048,
+ app.getAppAttemptResourceUsage().getUsed("").getMemory());
+
+ // Finish the two containers, we should see used resource becomes 0
+ cs.completedContainer(cs.getRMContainer(containerId2),
+ ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+ RMContainerEventType.KILL);
+ cs.completedContainer(cs.getRMContainer(containerId1),
+ ContainerStatus.newInstance(containerId1, ContainerState.COMPLETE, "",
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+ RMContainerEventType.KILL);
- // check used resource:
- // queue-a used x=0G, y=0G, ""=1G ("" not changed)
checkUsedResource(rm, "a", 0, "x");
checkUsedResource(rm, "a", 0, "y");
- checkUsedResource(rm, "a", 1024);
+ checkUsedResource(rm, "a", 0, "z");
+ checkUsedResource(rm, "a", 0);
+ checkUsedResource(rm, "root", 0, "x");
+ checkUsedResource(rm, "root", 0, "y");
+ checkUsedResource(rm, "root", 0, "z");
+ checkUsedResource(rm, "root", 0);
+ checkUserUsedResource(rm, "a", "user", "x", 0);
+ checkUserUsedResource(rm, "a", "user", "y", 0);
+ checkUserUsedResource(rm, "a", "user", "z", 0);
+ checkUserUsedResource(rm, "a", "user", "", 0);
+
+ rm.close();
+ }
+
+
+ @Test (timeout = 60000)
+ public void testComplexResourceUsageWhenNodeUpdatesPartition()
+ throws Exception {
+ /*
+ * This test is similar to testResourceUsageWhenNodeUpdatesPartition, this
+ * will include multiple applications, multiple users and multiple
+ * containers running on a single node, size of each container is 1G
+ *
+ * Node 1
+ * ------
+ * App1-container3
+ * App2-container2
+ * App2-Container3
+ *
+ * Node 2
+ * ------
+ * App2-container1
+ * App1-container1
+ * App1-container2
+ */
+ // set node -> label
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
+ // set mapping:
+ // h1 -> x
+ // h2 -> y
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm.getRMContext().setNodeLabelManager(mgr);
+ rm.start();
+ MockNM nm1 = rm.registerNode("h1:1234", 80000);
+ MockNM nm2 = rm.registerNode("h2:1234", 80000);
+
+ // app1
+ RMApp app1 = rm.submitApp(GB, "app", "u1", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+
+ // c2 on n1, c3 on n2
+ am1.allocate("*", GB, 1, new ArrayList<ContainerId>(), "x");
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertTrue(rm.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ am1.allocate("*", GB, 1, new ArrayList<ContainerId>());
containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ Assert.assertTrue(rm.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
- // change h3's label to z, AM container should be killed
- mgr.replaceLabelsOnNode(ImmutableMap.of(NodeId.newInstance("h3", 0),
- toSet("z")));
+ // app2
+ RMApp app2 = rm.submitApp(GB, "app", "u2", null, "a");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+
+ // c2/c3 on n1
+ am2.allocate("*", GB, 2, new ArrayList<ContainerId>(), "x");
+ containerId =
+ ContainerId.newContainerId(am2.getApplicationAttemptId(), 3);
Assert.assertTrue(rm.waitForState(nm1, containerId,
- RMContainerState.KILLED, 10 * 1000));
+ RMContainerState.ALLOCATED, 10 * 1000));
// check used resource:
- // queue-a used x=0G, y=0G, ""=1G ("" not changed)
+ // queue-a used x=1G, ""=1G
+ checkUsedResource(rm, "a", 3 * GB, "x");
+ checkUsedResource(rm, "a", 3 * GB);
+
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ FiCaSchedulerApp application1 =
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
+ FiCaSchedulerApp application2 =
+ cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+ // change h1's label to z
+ cs.handle(new NodeLabelsUpdateSchedulerEvent(ImmutableMap.of(nm1.getNodeId(),
+ toSet("z"))));
checkUsedResource(rm, "a", 0, "x");
- checkUsedResource(rm, "a", 0, "y");
- checkUsedResource(rm, "a", 0);
+ checkUsedResource(rm, "a", 3 * GB, "z");
+ checkUsedResource(rm, "a", 3 * GB);
+ checkUsedResource(rm, "root", 0, "x");
+ checkUsedResource(rm, "root", 3 * GB, "z");
+ checkUsedResource(rm, "root", 3 * GB);
+ checkUserUsedResource(rm, "a", "u1", "x", 0 * GB);
+ checkUserUsedResource(rm, "a", "u1", "z", 1 * GB);
+ checkUserUsedResource(rm, "a", "u1", "", 2 * GB);
+ checkUserUsedResource(rm, "a", "u2", "x", 0 * GB);
+ checkUserUsedResource(rm, "a", "u2", "z", 2 * GB);
+ checkUserUsedResource(rm, "a", "u2", "", 1 * GB);
+ Assert.assertEquals(0,
+ application1.getAppAttemptResourceUsage().getUsed("x").getMemory());
+ Assert.assertEquals(1 * GB,
+ application1.getAppAttemptResourceUsage().getUsed("z").getMemory());
+ Assert.assertEquals(2 * GB,
+ application1.getAppAttemptResourceUsage().getUsed("").getMemory());
+ Assert.assertEquals(0,
+ application2.getAppAttemptResourceUsage().getUsed("x").getMemory());
+ Assert.assertEquals(2 * GB,
+ application2.getAppAttemptResourceUsage().getUsed("z").getMemory());
+ Assert.assertEquals(1 * GB,
+ application2.getAppAttemptResourceUsage().getUsed("").getMemory());
rm.close();
}