You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2017/04/07 01:59:32 UTC
[36/50] [abbrv] hadoop git commit: YARN-6406. Remove
SchedulerRequestKeys when no more pending ResourceRequest. (Arun Suresh via
wangda)
YARN-6406. Remove SchedulerRequestKeys when no more pending ResourceRequest. (Arun Suresh via wangda)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/55285272
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/55285272
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/55285272
Branch: refs/heads/HDFS-10467
Commit: 55285272c6d3ac4b24748138eed442560f33717a
Parents: 9393209
Author: Wangda Tan <wa...@apache.org>
Authored: Tue Apr 4 14:43:58 2017 -0700
Committer: Inigo <in...@apache.org>
Committed: Thu Apr 6 18:58:22 2017 -0700
----------------------------------------------------------------------
.../scheduler/AppSchedulingInfo.java | 47 ++----
.../LocalitySchedulingPlacementSet.java | 8 +-
.../capacity/TestCapacityScheduler.java | 159 +++++++++++++++++++
.../scheduler/capacity/TestLeafQueue.java | 8 +-
.../webapp/TestRMWebServicesApps.java | 123 ++++++++++----
5 files changed, 266 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55285272/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index bff9c41..4de5eac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -25,12 +25,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -51,9 +47,8 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -86,8 +81,8 @@ public class AppSchedulingInfo {
private Set<String> requestedPartitions = new HashSet<>();
- private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
- schedulerKeys = new ConcurrentSkipListMap<>();
+ private final ConcurrentSkipListSet<SchedulerRequestKey>
+ schedulerKeys = new ConcurrentSkipListSet<>();
final Map<SchedulerRequestKey, SchedulingPlacementSet<SchedulerNode>>
schedulerKeyToPlacementSets = new ConcurrentHashMap<>();
@@ -156,29 +151,6 @@ public class AppSchedulingInfo {
LOG.info("Application " + applicationId + " requests cleared");
}
-
- private void incrementSchedulerKeyReference(
- SchedulerRequestKey schedulerKey) {
- Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
- if (schedulerKeyCount == null) {
- schedulerKeys.put(schedulerKey, 1);
- } else {
- schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
- }
- }
-
- public void decrementSchedulerKeyReference(
- SchedulerRequestKey schedulerKey) {
- Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
- if (schedulerKeyCount != null) {
- if (schedulerKeyCount > 1) {
- schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
- } else {
- schedulerKeys.remove(schedulerKey);
- }
- }
- }
-
public ContainerUpdateContext getUpdateContext() {
return updateContext;
}
@@ -230,6 +202,10 @@ public class AppSchedulingInfo {
}
}
+ public void removePlacementSets(SchedulerRequestKey schedulerRequestKey) {
+ schedulerKeyToPlacementSets.remove(schedulerRequestKey);
+ }
+
boolean addToPlacementSets(
boolean recoverPreemptedRequestForAContainer,
Map<SchedulerRequestKey, Map<String, ResourceRequest>> dedupRequests) {
@@ -268,7 +244,8 @@ public class AppSchedulingInfo {
(lastRequest != null) ? lastRequest.getNumContainers() : 0;
if (request.getNumContainers() <= 0) {
if (lastRequestContainers >= 0) {
- decrementSchedulerKeyReference(schedulerKey);
+ schedulerKeys.remove(schedulerKey);
+ schedulerKeyToPlacementSets.remove(schedulerKey);
}
LOG.info("checking for deactivate of application :"
+ this.applicationId);
@@ -276,7 +253,7 @@ public class AppSchedulingInfo {
} else {
// Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) {
- incrementSchedulerKeyReference(schedulerKey);
+ schedulerKeys.add(schedulerKey);
abstractUsersManager.activateApplication(user, applicationId);
}
}
@@ -366,7 +343,7 @@ public class AppSchedulingInfo {
}
public Collection<SchedulerRequestKey> getSchedulerKeys() {
- return schedulerKeys.keySet();
+ return schedulerKeys;
}
/**
@@ -389,7 +366,7 @@ public class AppSchedulingInfo {
public PendingAsk getNextPendingAsk() {
try {
readLock.lock();
- SchedulerRequestKey firstRequestKey = schedulerKeys.firstKey();
+ SchedulerRequestKey firstRequestKey = schedulerKeys.first();
return getPendingAsk(firstRequestKey, ResourceRequest.ANY);
} finally {
readLock.unlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55285272/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
index c32246d..6cc8cc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalitySchedulingPlacementSet.java
@@ -204,15 +204,17 @@ public class LocalitySchedulingPlacementSet<N extends SchedulerNode>
private void decrementOutstanding(SchedulerRequestKey schedulerRequestKey,
ResourceRequest offSwitchRequest) {
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
-
- // Do not remove ANY
offSwitchRequest.setNumContainers(numOffSwitchContainers);
// Do we have any outstanding requests?
// If there is nothing, we need to deactivate this application
if (numOffSwitchContainers == 0) {
- appSchedulingInfo.decrementSchedulerKeyReference(schedulerRequestKey);
+ appSchedulingInfo.getSchedulerKeys().remove(schedulerRequestKey);
appSchedulingInfo.checkForDeactivation();
+ resourceRequestMap.remove(ResourceRequest.ANY);
+ if (resourceRequestMap.isEmpty()) {
+ appSchedulingInfo.removePlacementSets(schedulerRequestKey);
+ }
}
appSchedulingInfo.decPendingResource(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55285272/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e2f456c..447ee3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -31,6 +31,7 @@ import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -86,6 +88,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
@@ -2947,6 +2950,162 @@ public class TestCapacityScheduler {
}
@Test
+ public void testSchedulerKeyGarbageCollection() throws Exception {
+ YarnConfiguration conf =
+ new YarnConfiguration(new CapacitySchedulerConfiguration());
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm = new MockRM(conf, memStore);
+ rm.start();
+
+ HashMap<NodeId, MockNM> nodes = new HashMap<>();
+ MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm1.getNodeId(), nm1);
+ MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm2.getNodeId(), nm2);
+ MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm3.getNodeId(), nm3);
+ MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
+ nodes.put(nm4.getNodeId(), nm4);
+ nm1.registerNode();
+ nm2.registerNode();
+ nm3.registerNode();
+ nm4.registerNode();
+
+ RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+ ApplicationAttemptId attemptId =
+ app1.getCurrentAppAttempt().getAppAttemptId();
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+ ResourceScheduler scheduler = rm.getResourceScheduler();
+
+ // All nodes 1 - 4 will be applicable for scheduling.
+ nm1.nodeHeartbeat(true);
+ nm2.nodeHeartbeat(true);
+ nm3.nodeHeartbeat(true);
+ nm4.nodeHeartbeat(true);
+
+ Thread.sleep(1000);
+
+ AllocateResponse allocateResponse = am1.allocate(
+ Arrays.asList(
+ newResourceRequest(1, 1, ResourceRequest.ANY,
+ Resources.createResource(3 * GB), 1, true,
+ ExecutionType.GUARANTEED),
+ newResourceRequest(2, 2, ResourceRequest.ANY,
+ Resources.createResource(3 * GB), 1, true,
+ ExecutionType.GUARANTEED),
+ newResourceRequest(3, 3, ResourceRequest.ANY,
+ Resources.createResource(3 * GB), 1, true,
+ ExecutionType.GUARANTEED),
+ newResourceRequest(4, 4, ResourceRequest.ANY,
+ Resources.createResource(3 * GB), 1, true,
+ ExecutionType.GUARANTEED)
+ ),
+ null);
+ List<Container> allocatedContainers = allocateResponse
+ .getAllocatedContainers();
+ Assert.assertEquals(0, allocatedContainers.size());
+
+ Collection<SchedulerRequestKey> schedulerKeys =
+ ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getAppSchedulingInfo().getSchedulerKeys();
+ Assert.assertEquals(4, schedulerKeys.size());
+
+ // Get a Node to HB... at which point 1 container should be
+ // allocated
+ nm1.nodeHeartbeat(true);
+ Thread.sleep(200);
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ allocatedContainers = allocateResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+
+ // Verify 1 outstanding schedulerKey is removed
+ Assert.assertEquals(3, schedulerKeys.size());
+
+ List <ResourceRequest> resReqs =
+ ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getAppSchedulingInfo().getAllResourceRequests();
+
+ // Verify 1 outstanding schedulerKey is removed from the
+ // rrMap as well
+ Assert.assertEquals(3, resReqs.size());
+
+ // Verify One more container Allocation on node nm2
+ // And ensure the outstanding schedulerKeys go down..
+ nm2.nodeHeartbeat(true);
+ Thread.sleep(200);
+
+ // Update the allocateReq to send 0 numContainer req.
+ // For the satisfied container...
+ allocateResponse = am1.allocate(Arrays.asList(
+ newResourceRequest(1,
+ allocatedContainers.get(0).getAllocationRequestId(),
+ ResourceRequest.ANY,
+ Resources.createResource(3 * GB), 0, true,
+ ExecutionType.GUARANTEED)
+ ),
+ new ArrayList<>());
+ allocatedContainers = allocateResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+
+ // Verify 1 outstanding schedulerKey is removed
+ Assert.assertEquals(2, schedulerKeys.size());
+
+ resReqs = ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getAppSchedulingInfo().getAllResourceRequests();
+ // Verify the map size is not increased due to 0 req
+ Assert.assertEquals(2, resReqs.size());
+
+ // Now Verify that the AM can cancel 1 Ask:
+ SchedulerRequestKey sk = schedulerKeys.iterator().next();
+ am1.allocate(
+ Arrays.asList(
+ newResourceRequest(sk.getPriority().getPriority(),
+ sk.getAllocationRequestId(),
+ ResourceRequest.ANY, Resources.createResource(3 * GB), 0, true,
+ ExecutionType.GUARANTEED)
+ ),
+ null);
+
+ schedulerKeys =
+ ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getAppSchedulingInfo().getSchedulerKeys();
+
+ Thread.sleep(200);
+
+ // Verify 1 outstanding schedulerKey is removed because of the
+ // cancel ask
+ Assert.assertEquals(1, schedulerKeys.size());
+
+ // Now verify that after the next node heartbeat, we allocate
+ // the last schedulerKey
+ nm3.nodeHeartbeat(true);
+ Thread.sleep(200);
+ allocateResponse = am1.allocate(new ArrayList<>(), new ArrayList<>());
+ allocatedContainers = allocateResponse.getAllocatedContainers();
+ Assert.assertEquals(1, allocatedContainers.size());
+
+ // Verify no more outstanding schedulerKeys..
+ Assert.assertEquals(0, schedulerKeys.size());
+ resReqs =
+ ((CapacityScheduler) scheduler).getApplicationAttempt(attemptId)
+ .getAppSchedulingInfo().getAllResourceRequests();
+ Assert.assertEquals(0, resReqs.size());
+ }
+
+ private static ResourceRequest newResourceRequest(int priority,
+ long allocReqId, String rName, Resource resource, int numContainers,
+ boolean relaxLoc, ExecutionType eType) {
+ ResourceRequest rr = ResourceRequest.newInstance(
+ Priority.newInstance(priority), rName, resource, numContainers,
+ relaxLoc, null, ExecutionTypeRequest.newInstance(eType, true));
+ rr.setAllocationRequestId(allocReqId);
+ return rr;
+ }
+
+ @Test
public void testHierarchyQueuesCurrentLimits() throws Exception {
/*
* Queue tree:
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55285272/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 1162b9f..252666d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -1059,13 +1059,9 @@ public class TestLeafQueue {
//test case 3
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
- qb.releaseResource(clusterResource, app_0,
- app_0.getAppSchedulingInfo().getPendingAsk(u0SchedKey)
- .getPerAllocationResource(),
+ qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
null, null);
- qb.releaseResource(clusterResource, app_2,
- app_2.getAppSchedulingInfo().getPendingAsk(u1SchedKey)
- .getPerAllocationResource(),
+ qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
null, null);
qb.setUserLimit(50);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/55285272/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
index fb9e8ed..aab9bee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.StringReader;
+import java.util.ArrayList;
import java.util.Collection;
import javax.ws.rs.core.MediaType;
@@ -46,6 +47,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -169,7 +172,38 @@ public class TestRMWebServicesApps extends JerseyTestBase {
assertEquals("incorrect number of elements", 1, nodesApps.getLength());
NodeList nodes = dom.getElementsByTagName("app");
assertEquals("incorrect number of elements", 1, nodes.getLength());
- verifyAppsXML(nodes, app1);
+ verifyAppsXML(nodes, app1, false);
+ rm.stop();
+ }
+
+ @Test
+ public void testRunningApp() throws JSONException, Exception {
+ rm.start();
+ MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 2048);
+ RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, amNodeManager);
+ am1.allocate("*", 4096, 1, new ArrayList<>());
+ amNodeManager.nodeHeartbeat(true);
+
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("cluster")
+ .path("apps").accept(MediaType.APPLICATION_XML)
+ .get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ String xml = response.getEntity(String.class);
+ DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
+ DocumentBuilder db = dbf.newDocumentBuilder();
+ InputSource is = new InputSource();
+ is.setCharacterStream(new StringReader(xml));
+ Document dom = db.parse(is);
+ NodeList nodesApps = dom.getElementsByTagName("apps");
+ assertEquals("incorrect number of elements", 1, nodesApps.getLength());
+ NodeList nodes = dom.getElementsByTagName("app");
+ assertEquals("incorrect number of elements", 1, nodes.getLength());
+ verifyAppsXML(nodes, app1, true);
+
+ testAppsHelper("apps/", app1, MediaType.APPLICATION_JSON, true);
rm.stop();
}
@@ -203,6 +237,11 @@ public class TestRMWebServicesApps extends JerseyTestBase {
public void testAppsHelper(String path, RMApp app, String media)
throws JSONException, Exception {
+ testAppsHelper(path, app, media, false);
+ }
+
+ public void testAppsHelper(String path, RMApp app, String media,
+ boolean hasResourceReq) throws JSONException, Exception {
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1").path("cluster")
@@ -215,7 +254,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
- verifyAppInfo(array.getJSONObject(0), app);
+ verifyAppInfo(array.getJSONObject(0), app, hasResourceReq);
}
@@ -239,7 +278,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
- verifyAppInfo(array.getJSONObject(0), app1);
+ verifyAppInfo(array.getJSONObject(0), app1, false);
rm.stop();
}
@@ -483,7 +522,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
assertEquals("incorrect number of elements", 1, apps.length());
JSONArray array = apps.getJSONArray("app");
assertEquals("incorrect number of elements", 1, array.length());
- verifyAppInfo(array.getJSONObject(0), app1);
+ verifyAppInfo(array.getJSONObject(0), app1, false);
rm.stop();
}
@@ -1327,7 +1366,7 @@ public class TestRMWebServicesApps extends JerseyTestBase {
JSONObject json = response.getEntity(JSONObject.class);
assertEquals("incorrect number of elements", 1, json.length());
- verifyAppInfo(json.getJSONObject("app"), app);
+ verifyAppInfo(json.getJSONObject("app"), app, false);
}
@Test
@@ -1351,11 +1390,11 @@ public class TestRMWebServicesApps extends JerseyTestBase {
Document dom = db.parse(is);
NodeList nodes = dom.getElementsByTagName("app");
assertEquals("incorrect number of elements", 1, nodes.getLength());
- verifyAppsXML(nodes, app1);
+ verifyAppsXML(nodes, app1, false);
rm.stop();
}
- public void verifyAppsXML(NodeList nodes, RMApp app)
+ public void verifyAppsXML(NodeList nodes, RMApp app, boolean hasResourceReq)
throws JSONException, Exception {
for (int i = 0; i < nodes.getLength(); i++) {
@@ -1394,32 +1433,38 @@ public class TestRMWebServicesApps extends JerseyTestBase {
WebServicesTestUtils.getXmlString(element, "amNodeLabelExpression"),
WebServicesTestUtils.getXmlString(element, "amRPCAddress"));
- assertEquals(element.getElementsByTagName("resourceRequests").getLength(),
- 1);
- Element resourceRequests =
- (Element) element.getElementsByTagName("resourceRequests").item(0);
- Element capability =
- (Element) resourceRequests.getElementsByTagName("capability").item(0);
-
- verifyResourceRequestsGeneric(app,
- WebServicesTestUtils.getXmlString(resourceRequests,
- "nodeLabelExpression"),
- WebServicesTestUtils.getXmlInt(resourceRequests, "numContainers"),
- WebServicesTestUtils.getXmlBoolean(resourceRequests, "relaxLocality"),
- WebServicesTestUtils.getXmlInt(resourceRequests, "priority"),
- WebServicesTestUtils.getXmlString(resourceRequests, "resourceName"),
- WebServicesTestUtils.getXmlLong(capability, "memory"),
- WebServicesTestUtils.getXmlLong(capability, "vCores"),
- WebServicesTestUtils.getXmlString(resourceRequests, "executionType"),
- WebServicesTestUtils.getXmlBoolean(resourceRequests,
- "enforceExecutionType"));
+ if (hasResourceReq) {
+ assertEquals(element.getElementsByTagName("resourceRequests").getLength(),
+ 1);
+ Element resourceRequests =
+ (Element) element.getElementsByTagName("resourceRequests").item(0);
+ Element capability =
+ (Element) resourceRequests.getElementsByTagName("capability").item(0);
+ ResourceRequest rr =
+ ((AbstractYarnScheduler)rm.getRMContext().getScheduler())
+ .getApplicationAttempt(
+ app.getCurrentAppAttempt().getAppAttemptId())
+ .getAppSchedulingInfo().getAllResourceRequests().get(0);
+ verifyResourceRequestsGeneric(rr,
+ WebServicesTestUtils.getXmlString(resourceRequests,
+ "nodeLabelExpression"),
+ WebServicesTestUtils.getXmlInt(resourceRequests, "numContainers"),
+ WebServicesTestUtils.getXmlBoolean(resourceRequests, "relaxLocality"),
+ WebServicesTestUtils.getXmlInt(resourceRequests, "priority"),
+ WebServicesTestUtils.getXmlString(resourceRequests, "resourceName"),
+ WebServicesTestUtils.getXmlLong(capability, "memory"),
+ WebServicesTestUtils.getXmlLong(capability, "vCores"),
+ WebServicesTestUtils.getXmlString(resourceRequests, "executionType"),
+ WebServicesTestUtils.getXmlBoolean(resourceRequests,
+ "enforceExecutionType"));
+ }
}
}
- public void verifyAppInfo(JSONObject info, RMApp app) throws JSONException,
- Exception {
+ public void verifyAppInfo(JSONObject info, RMApp app, boolean hasResourceReqs)
+ throws JSONException, Exception {
- int expectedNumberOfElements = 35;
+ int expectedNumberOfElements = 34 + (hasResourceReqs ? 2 : 0);
String appNodeLabelExpression = null;
String amNodeLabelExpression = null;
if (app.getApplicationSubmissionContext()
@@ -1461,7 +1506,9 @@ public class TestRMWebServicesApps extends JerseyTestBase {
amNodeLabelExpression,
amRPCAddress);
- verifyResourceRequests(info.getJSONArray("resourceRequests"), app);
+ if (hasResourceReqs) {
+ verifyResourceRequests(info.getJSONArray("resourceRequests"), app);
+ }
}
public void verifyAppInfoGeneric(RMApp app, String id, String user,
@@ -1490,8 +1537,10 @@ public class TestRMWebServicesApps extends JerseyTestBase {
WebServicesTestUtils.checkStringMatch("finalStatus", app
.getFinalApplicationStatus().toString(), finalStatus);
assertEquals("progress doesn't match", 0, progress, 0.0);
- WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
- trackingUI);
+ if ("UNASSIGNED".equals(trackingUI)) {
+ WebServicesTestUtils.checkStringMatch("trackingUI", "UNASSIGNED",
+ trackingUI);
+ }
WebServicesTestUtils.checkStringEqual("diagnostics",
app.getDiagnostics().toString(), diagnostics);
assertEquals("clusterId doesn't match",
@@ -1544,7 +1593,12 @@ public class TestRMWebServicesApps extends JerseyTestBase {
public void verifyResourceRequests(JSONArray resourceRequest, RMApp app)
throws JSONException {
JSONObject requestInfo = resourceRequest.getJSONObject(0);
- verifyResourceRequestsGeneric(app,
+ ResourceRequest rr =
+ ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+ .getApplicationAttempt(
+ app.getCurrentAppAttempt().getAppAttemptId())
+ .getAppSchedulingInfo().getAllResourceRequests().get(0);
+ verifyResourceRequestsGeneric(rr,
requestInfo.getString("nodeLabelExpression"),
requestInfo.getInt("numContainers"),
requestInfo.getBoolean("relaxLocality"), requestInfo.getInt("priority"),
@@ -1557,11 +1611,10 @@ public class TestRMWebServicesApps extends JerseyTestBase {
.getBoolean("enforceExecutionType"));
}
- public void verifyResourceRequestsGeneric(RMApp app,
+ public void verifyResourceRequestsGeneric(ResourceRequest request,
String nodeLabelExpression, int numContainers, boolean relaxLocality,
int priority, String resourceName, long memory, long vCores,
String executionType, boolean enforceExecutionType) {
- ResourceRequest request = app.getAMResourceRequests().get(0);
assertEquals("nodeLabelExpression doesn't match",
request.getNodeLabelExpression(), nodeLabelExpression);
assertEquals("numContainers doesn't match", request.getNumContainers(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org