You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/07/29 15:38:00 UTC
[hadoop] branch trunk updated: YARN-10663. Add runningApps stats in
SLS. Contributed by Vadaga Ananyo Rao
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 74770c8 YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao
74770c8 is described below
commit 74770c8a16ecd427d7d4e15b797bcb3ad068ff40
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Thu Jul 29 17:37:40 2021 +0200
YARN-10663. Add runningApps stats in SLS. Contributed by Vadaga Ananyo Rao
---
hadoop-tools/hadoop-sls/pom.xml | 5 ++
.../hadoop/yarn/sls/appmaster/AMSimulator.java | 14 +++
.../hadoop/yarn/sls/appmaster/DAGAMSimulator.java | 3 +-
.../hadoop/yarn/sls/appmaster/MRAMSimulator.java | 6 +-
.../yarn/sls/appmaster/StreamAMSimulator.java | 3 +-
.../hadoop/yarn/sls/nodemanager/NMSimulator.java | 21 ++++-
.../yarn/sls/resourcemanager/MockAMLauncher.java | 3 +-
.../hadoop/yarn/sls/appmaster/TestAMSimulator.java | 29 ++++++
.../yarn/sls/nodemanager/TestNMSimulator.java | 100 ++++++++++++++++++++-
9 files changed, 176 insertions(+), 8 deletions(-)
diff --git a/hadoop-tools/hadoop-sls/pom.xml b/hadoop-tools/hadoop-sls/pom.xml
index 5bb5a41..208cbdf 100644
--- a/hadoop-tools/hadoop-sls/pom.xml
+++ b/hadoop-tools/hadoop-sls/pom.xml
@@ -73,6 +73,11 @@
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
index 1330e4d..5315eaa 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/AMSimulator.java
@@ -25,7 +25,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -118,6 +121,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private Map<ApplicationId, AMSimulator> appIdToAMSim;
+ private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
+
public AMSimulator() {
this.responseQueue = new LinkedBlockingQueue<>();
}
@@ -236,6 +241,11 @@ public abstract class AMSimulator extends TaskRunner.Task {
LOG.info("AM container is null");
}
+ // Clear runningApps for ranNodes of this app
+ for (NodeId nodeId : ranNodes) {
+ se.getNmMap().get(nodeId).finishApplication(getApplicationId());
+ }
+
if (null == appAttemptId) {
// If appAttemptId == null, AM is not launched from RM's perspective, so
// it's unnecessary to finish am as well
@@ -497,4 +507,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
public ApplicationAttemptId getApplicationAttemptId() {
return appAttemptId;
}
+
+ public Set<NodeId> getRanNodes() {
+ return this.ranNodes;
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
index f886a69..c67544e 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/DAGAMSimulator.java
@@ -189,7 +189,8 @@ public class DAGAMSimulator extends AMSimulator {
appId, container.getId());
assignedContainers.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId())
- .addNewContainer(container, cs.getLifeTime());
+ .addNewContainer(container, cs.getLifeTime(), appId);
+ getRanNodes().add(container.getNodeId());
}
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
index 586c671..184fdca 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/MRAMSimulator.java
@@ -231,14 +231,16 @@ public class MRAMSimulator extends AMSimulator {
appId, container.getId());
assignedMaps.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId())
- .addNewContainer(container, cs.getLifeTime());
+ .addNewContainer(container, cs.getLifeTime(), appId);
+ getRanNodes().add(container.getNodeId());
} else if (! this.scheduledReduces.isEmpty()) {
ContainerSimulator cs = scheduledReduces.remove();
LOG.debug("Application {} starts to launch a reducer ({}).",
appId, container.getId());
assignedReduces.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId())
- .addNewContainer(container, cs.getLifeTime());
+ .addNewContainer(container, cs.getLifeTime(), appId);
+ getRanNodes().add(container.getNodeId());
}
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
index 46bc90a..7e35451 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/appmaster/StreamAMSimulator.java
@@ -172,7 +172,8 @@ public class StreamAMSimulator extends AMSimulator {
container.getId());
assignedStreams.put(container.getId(), cs);
se.getNmMap().get(container.getNodeId()).addNewContainer(container,
- cs.getLifeTime());
+ cs.getLifeTime(), appId);
+ getRanNodes().add(container.getNodeId());
}
}
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
index 2ec3976..54311d5 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NMSimulator.java
@@ -250,7 +250,8 @@ public class NMSimulator extends TaskRunner.Task {
/**
* launch a new container with the given life time
*/
- public void addNewContainer(Container container, long lifeTimeMS) {
+ public void addNewContainer(Container container, long lifeTimeMS,
+ ApplicationId applicationId) {
LOG.debug("NodeManager {} launches a new container ({}).",
node.getNodeID(), container.getId());
if (lifeTimeMS != -1) {
@@ -267,6 +268,15 @@ public class NMSimulator extends TaskRunner.Task {
amContainerList.add(container.getId());
}
}
+
+ // update runningApplications on the node
+ if (applicationId != null
+ && !getNode().getRunningApps().contains(applicationId)) {
+ getNode().getRunningApps().add(applicationId);
+ }
+ LOG.debug("Adding running app: {} on node: {}. " +
+ "Updated runningApps on this node are: {}",
+ applicationId, getNode().getNodeID(), getNode().getRunningApps());
}
/**
@@ -296,4 +306,13 @@ public class NMSimulator extends TaskRunner.Task {
List<ContainerId> getCompletedContainers() {
return completedContainerList;
}
+
+ public void finishApplication(ApplicationId applicationId) {
+ if (getNode().getRunningApps().contains(applicationId)) {
+ getNode().getRunningApps().remove(applicationId);
+ LOG.debug("Removed running app: {} from node: {}. " +
+ "Updated runningApps on this node are: {}",
+ applicationId, getNode().getNodeID(), getNode().getRunningApps());
+ }
+ }
}
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
index 37bf96a..d284076 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/resourcemanager/MockAMLauncher.java
@@ -104,7 +104,8 @@ public class MockAMLauncher extends ApplicationMasterLauncher
LOG.info("Notify AM launcher launched:" + amContainer.getId());
se.getNmMap().get(amContainer.getNodeId())
- .addNewContainer(amContainer, -1);
+ .addNewContainer(amContainer, -1, appId);
+ ams.getRanNodes().add(amContainer.getNodeId());
return;
} catch (Exception e) {
throw new YarnRuntimeException(e);
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
index ec7c81d..50ac700 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/appmaster/TestAMSimulator.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -32,7 +33,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
+import org.apache.hadoop.yarn.sls.SLSRunner;
import org.apache.hadoop.yarn.sls.conf.SLSConfiguration;
+import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.*;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
@@ -41,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import org.mockito.Mockito;
import java.io.IOException;
import java.nio.file.Files;
@@ -50,8 +54,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentMap;
+import static org.mockito.Mockito.when;
+
@RunWith(Parameterized.class)
public class TestAMSimulator {
private ResourceManager rm;
@@ -288,6 +295,28 @@ public class TestAMSimulator {
Assert.assertEquals(3, nodeRequestCount);
}
+ @Test
+ public void testAMSimulatorRanNodesCleared() throws Exception {
+ NMSimulator nm = new NMSimulator();
+ nm.init("/rack1/testNode1", Resources.createResource(1024 * 10, 10), 0, 1000,
+ rm, -1f);
+
+ Map<NodeId, NMSimulator> nmMap = new HashMap<>();
+ nmMap.put(nm.getNode().getNodeID(), nm);
+
+ MockAMSimulator app = new MockAMSimulator();
+ app.appId = ApplicationId.newInstance(0l, 1);
+ SLSRunner slsRunner = Mockito.mock(SLSRunner.class);
+ app.se = slsRunner;
+ when(slsRunner.getNmMap()).thenReturn(nmMap);
+ app.getRanNodes().add(nm.getNode().getNodeID());
+ nm.getNode().getRunningApps().add(app.appId);
+ Assert.assertTrue(nm.getNode().getRunningApps().contains(app.appId));
+
+ app.lastStep();
+ Assert.assertFalse(nm.getNode().getRunningApps().contains(app.appId));
+ Assert.assertTrue(nm.getNode().getRunningApps().isEmpty());
+ }
@After
public void tearDown() {
diff --git a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
index 003417e..f82ce91 100644
--- a/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
+++ b/hadoop-tools/hadoop-sls/src/test/java/org/apache/hadoop/yarn/sls/nodemanager/TestNMSimulator.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.yarn.sls.nodemanager;
import java.util.function.Supplier;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -109,7 +111,7 @@ public class TestNMSimulator {
ContainerId cId1 = newContainerId(1, 1, 1);
Container container1 = Container.newInstance(cId1, null, null,
Resources.createResource(GB, 1), null, null);
- node1.addNewContainer(container1, 100000l);
+ node1.addNewContainer(container1, 100000l, null);
Assert.assertTrue("Node1 should have one running container.",
node1.getRunningContainers().containsKey(cId1));
@@ -117,7 +119,7 @@ public class TestNMSimulator {
ContainerId cId2 = newContainerId(2, 1, 1);
Container container2 = Container.newInstance(cId2, null, null,
Resources.createResource(GB, 1), null, null);
- node1.addNewContainer(container2, -1l);
+ node1.addNewContainer(container2, -1l, null);
Assert.assertTrue("Node1 should have one running AM container",
node1.getAMContainers().contains(cId2));
@@ -137,6 +139,100 @@ public class TestNMSimulator {
appAttemptId), cId);
}
+ @Test
+ public void testNMSimAppAddedAndRemoved() throws Exception {
+ // Register one node
+ NMSimulator node = new NMSimulator();
+ node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
+ rm, -1f);
+ node.middleStep();
+
+ int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+ int cumulativeSleepTime = 0;
+ int sleepInterval = 100;
+
+ while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
+ Thread.sleep(sleepInterval);
+ cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
+ numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+ }
+
+ GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return rm.getResourceScheduler().getRootQueueMetrics()
+ .getAvailableMB() > 0;
+ }
+ }, 500, 10000);
+
+ Assert.assertEquals("Node should have no runningApps.",
+ node.getNode().getRunningApps().size(), 0);
+
+ // Allocate one app container on node
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ ApplicationAttemptId appAttemptId =
+ BuilderUtils.newApplicationAttemptId(appId, 1);
+ ContainerId cId = BuilderUtils.newContainerId(appAttemptId, 1);
+ Container container = Container.newInstance(cId, null, null,
+ Resources.createResource(GB, 1), null, null);
+ node.addNewContainer(container, 100000l, appId);
+ Assert.assertTrue("Node should have app: "
+ + appId + " in runningApps list.",
+ node.getNode().getRunningApps().contains(appId));
+
+ // Finish the app on the node.
+ node.finishApplication(appId);
+ Assert.assertFalse("Node should not have app: "
+ + appId + " in runningApps list.",
+ node.getNode().getRunningApps().contains(appId));
+ Assert.assertEquals("Node should have no runningApps.",
+ node.getNode().getRunningApps().size(), 0);
+ }
+
+ @Test
+ public void testNMSimNullAppAddedAndRemoved() throws Exception {
+ // Register one node
+ NMSimulator node = new NMSimulator();
+ node.init("/rack1/node1", Resources.createResource(GB * 10, 10), 0, 1000,
+ rm, -1f);
+ node.middleStep();
+
+ int numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+ int cumulativeSleepTime = 0;
+ int sleepInterval = 100;
+
+ while (numClusterNodes != 1 && cumulativeSleepTime < 5000) {
+ Thread.sleep(sleepInterval);
+ cumulativeSleepTime = cumulativeSleepTime + sleepInterval;
+ numClusterNodes = rm.getResourceScheduler().getNumClusterNodes();
+ }
+
+ GenericTestUtils.waitFor(new com.google.common.base.Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ return rm.getResourceScheduler().getRootQueueMetrics()
+ .getAvailableMB() > 0;
+ }
+ }, 500, 10000);
+
+ Assert.assertEquals("Node should have no runningApps.",
+ node.getNode().getRunningApps().size(), 0);
+
+ // Allocate null app container on node
+ ContainerId cId = newContainerId(1, 1, 1);
+ Container container = Container.newInstance(cId, null, null,
+ Resources.createResource(GB, 1), null, null);
+ node.addNewContainer(container, 100000l, null);
+ Assert.assertEquals("Node should have no runningApps if appId is null.",
+ node.getNode().getRunningApps().size(), 0);
+
+ // Finish non-existent app on the node.
+ ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ node.finishApplication(appId);
+ Assert.assertEquals("Node should have no runningApps.",
+ node.getNode().getRunningApps().size(), 0);
+ }
+
@After
public void tearDown() throws Exception {
rm.stop();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org