You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/04/15 16:20:22 UTC
[hadoop] branch trunk updated: YARN-9439. Support asynchronized
scheduling mode and multi-node lookup mechanism for app activities.
Contributed by Tao Yang.
This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7fa73fa YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.
7fa73fa is described below
commit 7fa73fac2676875561269e9408215e012269a18c
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Tue Apr 16 00:12:43 2019 +0800
YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.
---
.../scheduler/activities/ActivitiesLogger.java | 20 ++---
.../scheduler/activities/ActivitiesManager.java | 86 ++++++++++++----------
.../scheduler/activities/AppAllocation.java | 2 +-
.../resourcemanager/webapp/RMWebServices.java | 1 +
.../activities/TestActivitiesManager.java | 52 +++++++++++++
...esSchedulerActivitiesWithMultiNodesEnabled.java | 66 +++++++++++++++++
6 files changed, 179 insertions(+), 48 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 46ca4bd..7801109 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -63,10 +63,10 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
- NodeId nodeId = getRecordingNodeId(activitiesManager, node);
- if (nodeId == null) {
+ if (activitiesManager == null) {
return;
}
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (activitiesManager.shouldRecordThisNode(nodeId)) {
recordActivity(activitiesManager, nodeId, application.getQueueName(),
application.getApplicationId().toString(), priority,
@@ -85,10 +85,10 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) {
- NodeId nodeId = getRecordingNodeId(activitiesManager, node);
- if (nodeId == null) {
+ if (activitiesManager == null) {
return;
}
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
@@ -123,10 +123,10 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, RMContainer updatedContainer,
ActivityState activityState) {
- NodeId nodeId = getRecordingNodeId(activitiesManager, node);
- if (nodeId == null) {
+ if (activitiesManager == null) {
return;
}
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
@@ -163,10 +163,10 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, FiCaSchedulerNode node,
long currentTime,
SchedulerApplicationAttempt application) {
- NodeId nodeId = getRecordingNodeId(activitiesManager, node);
- if (nodeId == null) {
+ if (activitiesManager == null) {
return;
}
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
activitiesManager
.startAppAllocationRecording(nodeId, currentTime,
application);
@@ -214,10 +214,10 @@ public class ActivitiesLogger {
public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) {
- NodeId nodeId = getRecordingNodeId(activitiesManager, node);
- if (nodeId == null) {
+ if (activitiesManager == null) {
return;
}
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
if (activitiesManager.shouldRecordThisNode(nodeId)) {
recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
null, state, diagnostic, null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 740e974..99ee48ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.List;
import java.util.Set;
@@ -57,9 +59,10 @@ public class ActivitiesManager extends AbstractService {
private Set<NodeId> activeRecordedNodes;
private ConcurrentMap<ApplicationId, Long>
recordingAppActivitiesUntilSpecifiedTime;
- private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
- private ConcurrentMap<ApplicationId, List<AppAllocation>>
- completedAppAllocations;
+ private ThreadLocal<Map<ApplicationId, AppAllocation>>
+ appsAllocation;
+ @VisibleForTesting
+ ConcurrentMap<ApplicationId, Queue<AppAllocation>> completedAppAllocations;
private boolean recordNextAvailableNode = false;
private List<NodeAllocation> lastAvailableNodeActivities = null;
private Thread cleanUpThread;
@@ -71,7 +74,7 @@ public class ActivitiesManager extends AbstractService {
super(ActivitiesManager.class.getName());
recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
completedNodeAllocations = new ConcurrentHashMap<>();
- appsAllocation = new ConcurrentHashMap<>();
+ appsAllocation = ThreadLocal.withInitial(() -> new HashMap());
completedAppAllocations = new ConcurrentHashMap<>();
activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
@@ -79,11 +82,15 @@ public class ActivitiesManager extends AbstractService {
}
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
- if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
+ RMApp app = rmContext.getRMApps().get(applicationId);
+ if (app != null && app.getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
- List<AppAllocation> allocations = completedAppAllocations.get(
- applicationId);
-
+ Queue<AppAllocation> curAllocations =
+ completedAppAllocations.get(applicationId);
+ List<AppAllocation> allocations = null;
+ if (curAllocations != null) {
+ allocations = new ArrayList(curAllocations);
+ }
return new AppActivitiesInfo(allocations, applicationId);
} else {
return new AppActivitiesInfo(
@@ -135,13 +142,13 @@ public class ActivitiesManager extends AbstractService {
}
}
- Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
+ Iterator<Map.Entry<ApplicationId, Queue<AppAllocation>>> iteApp =
completedAppAllocations.entrySet().iterator();
while (iteApp.hasNext()) {
- Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
+ Map.Entry<ApplicationId, Queue<AppAllocation>> appAllocation =
iteApp.next();
- if (rmContext.getRMApps().get(appAllocation.getKey())
- .getFinalApplicationStatus()
+ RMApp rmApp = rmContext.getRMApps().get(appAllocation.getKey());
+ if (rmApp == null || rmApp.getFinalApplicationStatus()
!= FinalApplicationStatus.UNDEFINED) {
iteApp.remove();
}
@@ -191,18 +198,16 @@ public class ActivitiesManager extends AbstractService {
SchedulerApplicationAttempt application) {
ApplicationId applicationId = application.getApplicationId();
- if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
- && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
- > currTS) {
- appsAllocation.put(applicationId,
- new AppAllocation(application.getPriority(), nodeID,
- application.getQueueName()));
- }
-
- if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
- && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
- <= currTS) {
- turnOffActivityMonitoringForApp(applicationId);
+ Long turnOffTimestamp =
+ recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
+ if (turnOffTimestamp != null) {
+ if (turnOffTimestamp > currTS) {
+ appsAllocation.get().put(applicationId,
+ new AppAllocation(application.getPriority(), nodeID,
+ application.getQueueName()));
+ } else {
+ turnOffActivityMonitoringForApp(applicationId);
+ }
}
}
@@ -223,7 +228,7 @@ public class ActivitiesManager extends AbstractService {
ContainerId containerId, String priority, ActivityState state,
String diagnostic, String type) {
if (shouldRecordThisApp(applicationId)) {
- AppAllocation appAllocation = appsAllocation.get(applicationId);
+ AppAllocation appAllocation = appsAllocation.get().get(applicationId);
appAllocation.addAppAllocationActivity(containerId == null ?
"Container-Id-Not-Assigned" :
containerId.toString(), priority, state, diagnostic, type);
@@ -245,24 +250,27 @@ public class ActivitiesManager extends AbstractService {
ContainerId containerId, ActivityState appState, String diagnostic) {
if (shouldRecordThisApp(applicationId)) {
long currTS = SystemClock.getInstance().getTime();
- AppAllocation appAllocation = appsAllocation.remove(applicationId);
+ AppAllocation appAllocation = appsAllocation.get().remove(applicationId);
appAllocation.updateAppContainerStateAndTime(containerId, appState,
currTS, diagnostic);
- List<AppAllocation> appAllocations;
- if (completedAppAllocations.containsKey(applicationId)) {
- appAllocations = completedAppAllocations.get(applicationId);
- } else {
- appAllocations = new ArrayList<>();
- completedAppAllocations.put(applicationId, appAllocations);
+ Queue<AppAllocation> appAllocations =
+ completedAppAllocations.get(applicationId);
+ if (appAllocations == null) {
+ appAllocations = new ConcurrentLinkedQueue<>();
+ Queue<AppAllocation> curAppAllocations =
+ completedAppAllocations.putIfAbsent(applicationId, appAllocations);
+ if (curAppAllocations != null) {
+ appAllocations = curAppAllocations;
+ }
}
if (appAllocations.size() == 1000) {
- appAllocations.remove(0);
+ appAllocations.poll();
}
appAllocations.add(appAllocation);
-
- if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
- <= currTS) {
+ Long stopTime =
+ recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
+ if (stopTime != null && stopTime <= currTS) {
turnOffActivityMonitoringForApp(applicationId);
}
}
@@ -292,8 +300,12 @@ public class ActivitiesManager extends AbstractService {
}
boolean shouldRecordThisApp(ApplicationId applicationId) {
+ if (recordingAppActivitiesUntilSpecifiedTime.isEmpty()
+ || appsAllocation.get().isEmpty()) {
+ return false;
+ }
return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
- && appsAllocation.containsKey(applicationId);
+ && appsAllocation.get().containsKey(applicationId);
}
boolean shouldRecordThisNode(NodeId nodeID) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
index 15850c0..1903ae7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -68,7 +68,7 @@ public class AppAllocation {
}
public String getNodeId() {
- return nodeId.toString();
+ return nodeId == null ? null : nodeId.toString();
}
public String getQueueName() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index e4c81db..2e422e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -746,6 +746,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return appActivitiesInfo;
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
+ LOG.error(errMessage, e);
return new AppActivitiesInfo(errMessage, appId);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
index 5216a21..bc81e61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -189,6 +192,55 @@ public class TestActivitiesManager {
Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
}
+
+ /**
+ * Test recording app activities in multiple threads,
+ * only one activity info should be recorded by one of these threads.
+ */
+ @Test
+ public void testRecordingAppActivitiesInMultiThreads()
+ throws Exception {
+ Random rand = new Random();
+ // start recording activities for a random app
+ SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS));
+ activitiesManager
+ .turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3);
+ List<Future<Void>> futures = new ArrayList<>();
+ // generate app activities
+ int nTasks = 20;
+ for (int i=0; i<nTasks; i++) {
+ Callable<Void> task = () -> {
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+ (FiCaSchedulerNode) nodes.get(0),
+ SystemClock.getInstance().getTime(), randomApp);
+ for (SchedulerNode node : nodes) {
+ ActivitiesLogger.APP
+ .recordAppActivityWithoutAllocation(activitiesManager, node,
+ randomApp, Priority.newInstance(0),
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+ ActivityState.REJECTED);
+ }
+ ActivitiesLogger.APP
+ .finishAllocatedAppAllocationRecording(activitiesManager,
+ randomApp.getApplicationId(), null, ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
+ return null;
+ };
+ futures.add(threadPoolExecutor.submit(task));
+ }
+ // Check activities for multi-nodes should be recorded only once
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ Queue<AppAllocation> appAllocations =
+ activitiesManager.completedAppAllocations
+ .get(randomApp.getApplicationId());
+ Assert.assertEquals(nTasks, appAllocations.size());
+ for(AppAllocation aa : appAllocations) {
+ Assert.assertEquals(NUM_NODES, aa.getAllocationAttempts().size());
+ }
+ }
+
/**
* Testing activities manager which can record all history information about
* node allocations.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
index 724d592..7bc8634 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
import com.google.inject.servlet.ServletModule;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
import com.sun.jersey.test.framework.WebAppDescriptor;
import org.apache.hadoop.http.JettyUtils;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
@@ -84,6 +86,17 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
// enable multi-nodes placement
conf.setBoolean(
CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
+ String policyName = "resource-based";
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+ policyName);
+ conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+ policyName);
+ String policyConfPrefix =
+ CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + "."
+ + policyName;
+ conf.set(policyConfPrefix + ".class",
+ ResourceUsageMultiNodeLookupPolicy.class.getName());
+ conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
rm = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm);
serve("/*").with(GuiceContainer.class);
@@ -204,6 +217,59 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
}
}
+ @Test
+ public void testAppAssignContainer() throws Exception {
+ rm.start();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(3072),
+ 1)), null);
+
+ //Trigger recording for this app
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
+ ClientResponse response = r.path("ws").path("v1").path("cluster")
+ .path("scheduler/app-activities").queryParams(params)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ JSONObject json = response.getEntity(JSONObject.class);
+ assertEquals("waiting for display", json.getString("diagnostic"));
+
+ //Trigger scheduling for this app
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+ //Check app activities, it should contain one allocation and
+ // final allocation state is ALLOCATED
+ response = r.path("ws").path("v1").path("cluster")
+ .path("scheduler/app-activities").queryParams(params)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
+ JSONArray allocationAttempts =
+ allocations.getJSONArray("allocationAttempt");
+ assertEquals(2, allocationAttempts.length());
+ } finally {
+ rm.stop();
+ }
+ }
+
private void verifyNumberOfAllocations(JSONObject json, int realValue)
throws Exception {
if (json.isNull("allocations")) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org