You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/02 22:55:09 UTC

hadoop git commit: YARN-4934. Reserved Resource for QueueMetrics needs to be handled correctly in few cases. (Sunil G via wangda) (cherry picked from commit fdc46bfb37776d8c41b68f6c33a2379d0f329994)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 3232f5ffe -> a87abf9b6


YARN-4934. Reserved Resource for QueueMetrics needs to be handled correctly in few cases. (Sunil G via wangda)
(cherry picked from commit fdc46bfb37776d8c41b68f6c33a2379d0f329994)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a87abf9b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a87abf9b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a87abf9b

Branch: refs/heads/branch-2.8
Commit: a87abf9b69bf5db22c495cade20de3d6bcbd1849
Parents: 3232f5f
Author: Wangda Tan <wa...@apache.org>
Authored: Sat Apr 16 22:47:41 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Dec 2 22:53:19 2016 +0000

----------------------------------------------------------------------
 .../scheduler/capacity/LeafQueue.java           |   7 -
 .../scheduler/common/fica/FiCaSchedulerApp.java |   2 +
 .../capacity/TestContainerAllocation.java       | 188 ++++++++++++++++++-
 3 files changed, 189 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87abf9b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2a9c006..20378bb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -1441,13 +1441,6 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
 
-          // track reserved resource for metrics, for normal container
-          // getReservedResource will be null.
-          Resource reservedRes = rmContainer.getReservedResource();
-          if (reservedRes != null && !reservedRes.equals(Resources.none())) {
-            decReservedResource(node.getPartition(), reservedRes);
-          }
-
           // Inform the ordering policy
           orderingPolicy.containerReleased(application, rmContainer);
           

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87abf9b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index deb06a5..595b3fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -248,6 +248,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       // Update reserved metrics
       queue.getMetrics().unreserveResource(getUser(),
           rmContainer.getReservedResource());
+      queue.decReservedResource(node.getPartition(),
+          rmContainer.getReservedResource());
       return true;
     }
     return false;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a87abf9b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 3a6ca82..8d9d0c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.security.SecurityUtilTestHelper;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -37,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -50,8 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -418,5 +426,183 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
-  
+
+  @Test(timeout = 60000)
+  public void testAllocationForReservedContainer() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to a queue. And there's one
+     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
+     * app2 asks for a 4G container. App2's request will be reserved on the
+     * node.
+     *
+     * Before next node heartbeat, app1 container is completed/killed. So app1
+     * container which was reserved will be allocated.
+     */
+    // inject node label manager
+    MockRM rm1 = new MockRM();
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
+
+    // Do node heartbeats 2 times
+    // First time will allocate container for app1, second time will reserve
+    // container for app2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Check if a 4G container allocated for app1, and nothing allocated for app2
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+    Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
+    Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    // Mark one app1 container as killed/completed and re-kick RM
+    for (RMContainer container : schedulerApp1.getLiveContainers()) {
+      if (container.isAMContainer()) {
+        continue;
+      }
+      cs.markContainerForKillable(container);
+    }
+    // Cancel asks of app1 and re-kick RM
+    am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Check 4G container cancelled for app1, and one container allocated for
+    // app2
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G
+    Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    rm1.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to a queue. And there's one
+     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
+     * app2 asks for a 4G container. App2's request will be reserved on the
+     * node.
+     *
+     * Before next node heartbeat, app1 container is completed/killed. So app1
+     * container which was reserved will be allocated.
+     */
+    // inject node label manager
+    MockRM rm1 = new MockRM();
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
+
+    // Do node heartbeats 2 times
+    // First time will allocate container for app1, second time will reserve
+    // container for app2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Check if a 4G container allocated for app1, and nothing allocated for app2
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+    Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
+    Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    // Remove the node
+    cs.handle(new NodeRemovedSchedulerEvent(rmNode1));
+
+    // Check all container cancelled for app1 and app2
+    Assert.assertEquals(0, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(0, schedulerApp2.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
+
+    // Usage and Reserved capacity of queue is 0
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    rm1.close();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org