You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2019/12/18 21:37:17 UTC
[hadoop] branch branch-3.1 updated: YARN-9894.
CapacitySchedulerPerf test for measuring hundreds of apps in a large number
of queues. Contributed by Eric Payne
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 750fb4c YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne
750fb4c is described below
commit 750fb4c3212e7c197f418ea2df711be4467ee27a
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Dec 18 13:18:11 2019 -0800
YARN-9894. CapacitySchedulerPerf test for measuring hundreds of apps in a large number of queues. Contributed by Eric Payne
(cherry picked from commit 7b93575b92e8bad889c1ef15e0baaade6de6de4d)
(cherry picked from commit 0707d0a0ae36456f3467cbb408c3a9a0073c70f7)
---
.../capacity/TestCapacitySchedulerPerf.java | 176 ++++++++++++++++-----
1 file changed, 136 insertions(+), 40 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
index a2ccf6e..bdc03f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptMetrics;
@@ -60,6 +61,9 @@ import java.util.Map;
import java.util.PriorityQueue;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -70,9 +74,22 @@ public class TestCapacitySchedulerPerf {
return "resource-" + idx;
}
+ // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
+ // on the command line. In addition, this test has tunables for the following:
+ // Number of queues: -DNumberOfQueues (default=100)
+ // Number of total apps: -DNumberOfApplications (default=200)
+ // Percentage of queues with apps: -DPercentActiveQueues (default=100)
+ // E.G.:
+ // mvn test -Dtest=TestCapacitySchedulerPerf -Dsurefire.fork.timeout=1800 \
+ // -DRunCapacitySchedulerPerfTests=true -DNumberOfQueues=50 \
+ // -DNumberOfApplications=200 -DPercentActiveQueues=100
+ // Note that the surefire.fork.timeout flag is added because these tests could
+ // take longer than the surefire timeout.
private void testUserLimitThroughputWithNumberOfResourceTypes(
- int numOfResourceTypes)
+ int numOfResourceTypes, int numQueues, int pctActiveQueues, int appCount)
throws Exception {
+ Assume.assumeTrue(Boolean.valueOf(
+ System.getProperty("RunCapacitySchedulerPerfTests")));
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
@@ -91,22 +108,16 @@ public class TestCapacitySchedulerPerf {
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
- // Since this is more of a performance unit test, only run if
- // RunUserLimitThroughput is set (-DRunUserLimitThroughput=true)
- Assume.assumeTrue(Boolean.valueOf(
- System.getProperty("RunCapacitySchedulerPerfTests")));
+ final int activeQueues = (int) (numQueues * (pctActiveQueues/100f));
+ final int totalApps = appCount + activeQueues;
+ // extra apps to get started with user limit
CapacitySchedulerConfiguration csconf =
- new CapacitySchedulerConfiguration();
- csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
- csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
- csconf.setMaximumApplicationMasterResourcePerQueuePercent("root.default",
- 100.0f);
- csconf.setMaximumAMResourcePercentPerPartition("root.default", "", 100.0f);
- csconf.setResourceComparator(DominantResourceCalculator.class);
+ createCSConfWithManyQueues(numQueues);
YarnConfiguration conf = new YarnConfiguration(csconf);
- // Don't reset resource types since we have already configured resource types
+ // Don't reset resource types since we have already configured resource
+ // types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@@ -115,11 +126,16 @@ public class TestCapacitySchedulerPerf {
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
- LeafQueue qb = (LeafQueue)cs.getQueue("default");
- // For now make user limit large so we can activate all applications
- qb.setUserLimitFactor((float)100.0);
- qb.setupConfigurableCapacities();
+ LeafQueue[] lqs = new LeafQueue[numQueues];
+ for (int i = 0; i < numQueues; i++) {
+ String queueName = String.format("%03d", i);
+ LeafQueue qb = (LeafQueue)cs.getQueue(queueName);
+ // For now make user limit large so we can activate all applications
+ qb.setUserLimitFactor((float)100.0);
+ qb.setupConfigurableCapacities();
+ lqs[i] = qb;
+ }
SchedulerEvent addAppEvent;
SchedulerEvent addAttemptEvent;
@@ -127,13 +143,12 @@ public class TestCapacitySchedulerPerf {
ApplicationSubmissionContext submissionContext =
mock(ApplicationSubmissionContext.class);
- final int appCount = 100;
- ApplicationId[] appids = new ApplicationId[appCount];
- RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[appCount];
- ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[appCount];
- RMAppImpl[] apps = new RMAppImpl[appCount];
- RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[appCount];
- for (int i=0; i<appCount; i++) {
+ ApplicationId[] appids = new ApplicationId[totalApps];
+ RMAppAttemptImpl[] attempts = new RMAppAttemptImpl[totalApps];
+ ApplicationAttemptId[] appAttemptIds = new ApplicationAttemptId[totalApps];
+ RMAppImpl[] apps = new RMAppImpl[totalApps];
+ RMAppAttemptMetrics[] attemptMetrics = new RMAppAttemptMetrics[totalApps];
+ for (int i=0; i<totalApps; i++) {
appids[i] = BuilderUtils.newApplicationId(100, i);
appAttemptIds[i] =
BuilderUtils.newApplicationAttemptId(appids[i], 1);
@@ -150,34 +165,34 @@ public class TestCapacitySchedulerPerf {
when(apps[i].getCurrentAppAttempt()).thenReturn(attempts[i]);
rm.getRMContext().getRMApps().put(appids[i], apps[i]);
+ String queueName = lqs[i % activeQueues].getQueueName();
addAppEvent =
- new AppAddedSchedulerEvent(appids[i], "default", "user1");
+ new AppAddedSchedulerEvent(appids[i], queueName, "user1");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptIds[i], false);
cs.handle(addAttemptEvent);
}
- // add nodes to cluster, so cluster has 20GB and 20 vcores
- Resource nodeResource = Resource.newInstance(10 * GB, 10);
+ // add nodes to cluster with enough resources to satisfy all apps
+ Resource newResource = Resource.newInstance(totalApps * GB, totalApps);
if (numOfResourceTypes > 2) {
for (int i = 2; i < numOfResourceTypes; i++) {
- nodeResource.setResourceValue(getResourceName(i), 10);
+ newResource.setResourceValue(getResourceName(i), totalApps);
}
}
-
- RMNode node = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.1");
+ RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
- RMNode node2 = MockNodes.newNodeInfo(0, nodeResource, 1, "127.0.0.2");
+ RMNode node2 = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[appCount];
- for (int i=0;i<appCount;i++) {
+ FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
+ for (int i=0;i<totalApps;i++) {
fiCaApps[i] =
cs.getSchedulerApplications().get(apps[i].getApplicationId())
.getCurrentAppAttempt();
@@ -195,8 +210,30 @@ public class TestCapacitySchedulerPerf {
fiCaApps[i].updateResourceRequests(
Collections.singletonList(resourceRequest));
}
- // Now force everything to be over user limit
- qb.setUserLimitFactor((float)0.0);
+ // Now force everything to be at user limit
+ for (int i = 0; i < numQueues; i++) {
+ lqs[i].setUserLimitFactor((float)0.0);
+ }
+
+ // allocate one container for each extra apps since
+ // LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
+ cs.handle(new NodeUpdateSchedulerEvent(node));
+ cs.handle(new NodeUpdateSchedulerEvent(node2));
+
+ // make sure only the extra apps have allocated containers
+ for (int i=0;i<totalApps;i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (i < activeQueues) {
+ assertFalse(pending);
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertTrue(pending);
+ assertEquals(1*GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
+ }
// Quiet the loggers while measuring throughput
for (Enumeration<?> loggers = LogManager.getCurrentLoggers();
@@ -239,27 +276,86 @@ public class TestCapacitySchedulerPerf {
}
System.out.println(
"#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
- + ": " + numerator / (timespent / entries));
+ + ": " + numerator / (timespent / entries) + " ops/sec of "
+ + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+ + " queues.");
+
+ // make sure only the extra apps have allocated containers
+ for (int i=0;i<totalApps;i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (i < activeQueues) {
+ assertFalse(pending);
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertTrue(pending);
+ assertEquals(1*GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
+ }
+
+ rm.close();
rm.stop();
}
@Test(timeout = 300000)
public void testUserLimitThroughputForTwoResources() throws Exception {
- testUserLimitThroughputWithNumberOfResourceTypes(2);
+ testUserLimitThroughputWithNumberOfResourceTypes(2, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForThreeResources() throws Exception {
- testUserLimitThroughputWithNumberOfResourceTypes(3);
+ testUserLimitThroughputWithNumberOfResourceTypes(3, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFourResources() throws Exception {
- testUserLimitThroughputWithNumberOfResourceTypes(4);
+ testUserLimitThroughputWithNumberOfResourceTypes(4, 1, 100, 100);
}
@Test(timeout = 300000)
public void testUserLimitThroughputForFiveResources() throws Exception {
- testUserLimitThroughputWithNumberOfResourceTypes(5);
+ testUserLimitThroughputWithNumberOfResourceTypes(5, 1, 100, 100);
+ }
+
+ @Test(timeout = 1800000)
+ public void testUserLimitThroughputWithManyQueues() throws Exception {
+
+ int numQueues = Integer.getInteger("NumberOfQueues", 40);
+ int pctActiveQueues = Integer.getInteger("PercentActiveQueues", 100);
+ int appCount = Integer.getInteger("NumberOfApplications", 100);
+
+ testUserLimitThroughputWithNumberOfResourceTypes(
+ 2, numQueues, pctActiveQueues, appCount);
+ }
+
+ CapacitySchedulerConfiguration createCSConfWithManyQueues(int numQueues)
+ throws Exception {
+ CapacitySchedulerConfiguration csconf =
+ new CapacitySchedulerConfiguration();
+ csconf.setResourceComparator(DominantResourceCalculator.class);
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent("root", 100.0f);
+ csconf.setMaximumAMResourcePercentPerPartition("root", "", 100.0f);
+ csconf.setCapacity("root.default", 0.0f);
+ csconf.setOffSwitchPerHeartbeatLimit(numQueues);
+
+ float capacity = 100.0f / numQueues;
+ String[] subQueues = new String[numQueues];
+ for (int i = 0; i < numQueues; i++) {
+ String queueName = String.format("%03d", i);
+ String queuePath = "root." + queueName;
+ subQueues[i] = queueName;
+ csconf.setMaximumApplicationMasterResourcePerQueuePercent(
+ queuePath, 100.0f);
+ csconf.setMaximumAMResourcePercentPerPartition(queuePath, "", 100.0f);
+ csconf.setCapacity(queuePath, capacity);
+ csconf.setUserLimitFactor(queuePath, 100.0f);
+ csconf.setMaximumCapacity(queuePath, 100.0f);
+ }
+
+ csconf.setQueues("root", subQueues);
+
+ return csconf;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org