You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/07/29 15:38:00 UTC

[hadoop] branch trunk updated: YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 74770c8  YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao
74770c8 is described below

commit 74770c8a16ecd427d7d4e15b797bcb3ad068ff40
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Jul 29 17:37:40 2021 +0200

    YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao
---
 hadoop-tools/hadoop-sls/pom.xml                    |   5 ++
 .../hadoop/yarn/sls/appmaster/AMSimulator.java     |  14 +++
 .../hadoop/yarn/sls/appmaster/DAGAMSimulator.java  |   3 +-
 .../hadoop/yarn/sls/appmaster/MRAMSimulator.java   |   6 +-
 .../yarn/sls/appmaster/StreamAMSimulator.java      |   3 +-
 .../hadoop/yarn/sls/nodemanager/NMSimulator.java   |  21 ++++-
 .../yarn/sls/resourcemanager/MockAMLauncher.java   |   3 +-
 .../hadoop/yarn/sls/appmaster/TestAMSimulator.java |  29 ++++++
 .../yarn/sls/nodemanager/TestNMSimulator.java      | 100 ++++++++++++++++++++-
 9 files changed, 176 insertions(+), 8 deletions(-)

diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 5bb5a41..208cbdf 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -73,6 +73,11 @@
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 1330e4d..5315eaa 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -25,7 +25,9 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -118,6 +121,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
 
   private Map<ApplicationId, AMSimulator> appIdToAMSim;
 
+  private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
+
   public AMSimulator() {
     this.responseQueue = new LinkedBlockingQueue<>();
   }
@@ -236,6 +241,11 @@ public abstract class AMSimulator extends TaskRunner.Task {
       LOG.info("AM container is null");
     }
 
+    // Clear runningApps for ranNodes of this app
+    for (NodeId nodeId : ranNodes) {
+      se.getNmMap().get(nodeId).finishApplication(getApplicationId());
+    }
+
     if (null == appAttemptId) {
       // If appAttemptId == null, AM is not launched from RM's perspective, so
       // it's unnecessary to finish am as well
@@ -497,4 +507,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
   public ApplicationAttemptId getApplicationAttemptId() {
     return appAttemptId;
   }
+
+  public Set<NodeId> getRanNodes() {
+    return this.ranNodes;
+  }
 }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
index f886a69..c67544e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -189,7 +189,8 @@ public class DAGAMSimulator extends AMSimulator {
               appId, container.getId());
           assignedContainers.put(container.getId(), cs);
           se.getNmMap().get(container.getNodeId())
-              .addNewContainer(container, cs.getLifeTime());
+              .addNewContainer(container, cs.getLifeTime(), appId);
+          getRanNodes().add(container.getNodeId());
         }
       }
     }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 586c671..184fdca 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -231,14 +231,16 @@ public class MRAMSimulator extends AMSimulator {
               appId, container.getId());
           assignedMaps.put(container.getId(), cs);
           se.getNmMap().get(container.getNodeId())
-                  .addNewContainer(container, cs.getLifeTime());
+              .addNewContainer(container, cs.getLifeTime(), appId);
+          getRanNodes().add(container.getNodeId());
         } else if (! this.scheduledReduces.isEmpty()) {
           ContainerSimulator cs = scheduledReduces.remove();
           LOG.debug("Application {} starts to launch a reducer ({}).",
               appId, container.getId());
           assignedReduces.put(container.getId(), cs);
           se.getNmMap().get(container.getNodeId())
-                  .addNewContainer(container, cs.getLifeTime());
+              .addNewContainer(container, cs.getLifeTime(), appId);
+          getRanNodes().add(container.getNodeId());
         }
       }
     }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 46bc90a..7e35451 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -172,7 +172,8 @@ public class StreamAMSimulator extends AMSimulator {
               container.getId());
           assignedStreams.put(container.getId(), cs);
           se.getNmMap().get(container.getNodeId()).addNewContainer(container,
-              cs.getLifeTime());
+              cs.getLifeTime(), appId);
+          getRanNodes().add(container.getNodeId());
         }
       }
     }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index 2ec3976..54311d5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -250,7 +250,8 @@ public class NMSimulator extends TaskRunner.Task {
   /**
    * launch a new container with the given life time
    */
-  public void addNewContainer(Container container, long lifeTimeMS) {
+  public void addNewContainer(Container container, long lifeTimeMS,
+      ApplicationId applicationId) {
     LOG.debug("NodeManager {} launches a new container ({}).",
         node.getNodeID(), container.getId());
     if (lifeTimeMS != -1) {
@@ -267,6 +268,15 @@ public class NMSimulator extends TaskRunner.Task {
         amContainerList.add(container.getId());
       }
     }
+
+    // update runningApplications on the node
+    if (applicationId != null
+        && !getNode().getRunningApps().contains(applicationId)) {
+      getNode().getRunningApps().add(applicationId);
+    }
+    LOG.debug("Adding running app: {} on node: {}. " +
+            "Updated runningApps on this node are: {}",
+        applicationId, getNode().getNodeID(), getNode().getRunningApps());
   }
 
   /**
@@ -296,4 +306,13 @@ public class NMSimulator extends TaskRunner.Task {
   List<ContainerId> getCompletedContainers() {
     return completedContainerList;
   }
+
+  public void finishApplication(ApplicationId applicationId) {
+    if (getNode().getRunningApps().contains(applicationId)) {
+      getNode().getRunningApps().remove(applicationId);
+      LOG.debug("Removed running app: {} from node: {}. " +
+              "Updated runningApps on this node are: {}",
+          applicationId, getNode().getNodeID(), getNode().getRunningApps());
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index 37bf96a..d284076 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -104,7 +104,8 @@ public class MockAMLauncher extends ApplicationMasterLauncher
         LOG.info("Notify AM launcher launched:" + amContainer.getId());
 
         se.getNmMap().get(amContainer.getNodeId())
-            .addNewContainer(amContainer, -1);
+            .addNewContainer(amContainer, -1, appId);
+        ams.getRanNodes().add(amContainer.getNodeId());
         return;
       } catch (Exception e) {
         throw new YarnRuntimeException(e);
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index ec7c81d..50ac700 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -32,7 +33,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.SLSRunner;
 import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
 import org.apache.hadoop.yarn.sls.scheduler.*;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
@@ -41,6 +44,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.nio.file.Files;
@@ -50,8 +54,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import static org.mockito.Mockito.when;
+
 @RunWith(Parameterized.class)
 public class TestAMSimulator {
   private ResourceManager rm;
@@ -288,6 +295,28 @@ public class TestAMSimulator {
     Assert.assertEquals(3, nodeRequestCount);
   }
 
+  @Test
+  public void testAMSimulatorRanNodesCleared() throws Exception {
+    NMSimulator nm = new NMSimulator();
+    nm.init("/rack1/testNode1", Resources.createResource(1024 * 10, 10), 0, 1000,
+        rm, -1f);
+
+    Map<NodeId, NMSimulator> nmMap = new HashMap<>();
+    nmMap.put(nm.getNode().getNodeID(), nm);
+
+    MockAMSimulator app = new MockAMSimulator();
+    app.appId = ApplicationId.newInstance(0l, 1);
+    SLSRunner slsRunner = Mockito.mock(SLSRunner.class);
+    app.se = slsRunner;
+    when(slsRunner.getNmMap()).thenReturn(nmMap);
+    app.getRanNodes().add(nm.getNode().getNodeID());
+    nm.getNode().getRunningApps().add(app.appId);
+    Assert.assertTrue(nm.getNode().getRunningApps().contains(app.appId));
+
+    app.lastStep();
+    Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
+    Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
+  }
 
   @After
   public void tearDown() {
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
index 003417e..f82ce91 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.sls.nodemanager;
 
 import java.util.function.Supplier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -109,7 +111,7 @@ public class TestNMSimulator {
     ContainerId cId1 = newContainerId(1, 1, 1);
     Container container1 = Container.newInstance(cId1, null, null,
         Resources.createResource(GB, 1), null, null);
-    node1.addNewContainer(container1, 100000l);
+    node1.addNewContainer(container1, 100000l, null);
     Assert.assertTrue("Node1 should have one running container.",
         node1.getRunningContainers().containsKey(cId1));
 
@@ -117,7 +119,7 @@ public class TestNMSimulator {
     ContainerId cId2 = newContainerId(2, 1, 1);
     Container container2 = Container.newInstance(cId2, null, null,
         Resources.createResource(GB, 1), null, null);
-    node1.addNewContainer(container2, -1l);
+    node1.addNewContainer(container2, -1l, null);
     Assert.assertTrue("Node1 should have one running AM container",
         node1.getAMContainers().contains(cId2));
 
@@ -137,6 +139,100 @@ public class TestNMSimulator {
             appAttemptId), cId);
   }
 
+  @Test
+  public void testNMSimAppAddedAndRemoved() throws Exception {
+    // Register one node
+    NMSimulator node = new NMSimulator();
+    node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
+        rm, -1f);
+    node.middleStep();
+
+    int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+    int cumulativeSleepTime = 0;
+    int sleepInterval = 100;
+
+    while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
+      Thread.sleep(sleepInterval);
+      cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
+      numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+    }
+
+    GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return rm.getResourceScheduler().getRootQueueMetrics()
+            .getAvailableMB() > 0;
+      }
+    }, 500, 10000);
+
+    Assert.assertEquals("Node should have no runningApps.",
+        node.getNode().getRunningApps().size(), 0);
+
+    // Allocate one app container on node
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
+    Container container = Container.newInstance(cId, null, null,
+        Resources.createResource(GB, 1), null, null);
+    node.addNewContainer(container, 100000l, appId);
+    Assert.assertTrue("Node should have app: "
+            + appId + " in runningApps list.",
+        node.getNode().getRunningApps().contains(appId));
+
+    // Finish the app on the node.
+    node.finishApplication(appId);
+    Assert.assertFalse("Node should not have app: "
+            + appId + " in runningApps list.",
+        node.getNode().getRunningApps().contains(appId));
+    Assert.assertEquals("Node should have no runningApps.",
+        node.getNode().getRunningApps().size(), 0);
+  }
+
+  @Test
+  public void testNMSimNullAppAddedAndRemoved() throws Exception {
+    // Register one node
+    NMSimulator node = new NMSimulator();
+    node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
+        rm, -1f);
+    node.middleStep();
+
+    int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+    int cumulativeSleepTime = 0;
+    int sleepInterval = 100;
+
+    while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
+      Thread.sleep(sleepInterval);
+      cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
+      numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+    }
+
+    GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        return rm.getResourceScheduler().getRootQueueMetrics()
+            .getAvailableMB() > 0;
+      }
+    }, 500, 10000);
+
+    Assert.assertEquals("Node should have no runningApps.",
+        node.getNode().getRunningApps().size(), 0);
+
+    // Allocate null app container on node
+    ContainerId cId = newContainerId(1, 1, 1);
+    Container container = Container.newInstance(cId, null, null,
+        Resources.createResource(GB, 1), null, null);
+    node.addNewContainer(container, 100000l, null);
+    Assert.assertEquals("Node should have no runningApps if appId is null.",
+        node.getNode().getRunningApps().size(), 0);
+
+    // Finish non-existent app on the node.
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    node.finishApplication(appId);
+    Assert.assertEquals("Node should have no runningApps.",
+        node.getNode().getRunningApps().size(), 0);
+  }
+
   @After
   public void tearDown() throws Exception {
     rm.stop();

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org