You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/04/15 16:20:22 UTC

[hadoop] branch trunk updated: YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.

This is an automated email from the ASF dual-hosted git repository.

wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7fa73fa  YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.
7fa73fa is described below

commit 7fa73fac2676875561269e9408215e012269a18c
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Tue Apr 16 00:12:43 2019 +0800

    YARN-9439. Support asynchronized scheduling mode and multi-node lookup mechanism for app activities. Contributed by Tao Yang.
---
 .../scheduler/activities/ActivitiesLogger.java     | 20 ++---
 .../scheduler/activities/ActivitiesManager.java    | 86 ++++++++++++----------
 .../scheduler/activities/AppAllocation.java        |  2 +-
 .../resourcemanager/webapp/RMWebServices.java      |  1 +
 .../activities/TestActivitiesManager.java          | 52 +++++++++++++
 ...esSchedulerActivitiesWithMultiNodesEnabled.java | 66 +++++++++++++++++
 6 files changed, 179 insertions(+), 48 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 46ca4bd..7801109 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -63,10 +63,10 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic) {
-      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
-      if (nodeId == null) {
+      if (activitiesManager == null) {
         return;
       }
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
       if (activitiesManager.shouldRecordThisNode(nodeId)) {
         recordActivity(activitiesManager, nodeId, application.getQueueName(),
             application.getApplicationId().toString(), priority,
@@ -85,10 +85,10 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic, ActivityState appState) {
-      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
-      if (nodeId == null) {
+      if (activitiesManager == null) {
         return;
       }
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
       if (activitiesManager.shouldRecordThisNode(nodeId)) {
         String type = "container";
         // Add application-container activity into specific node allocation.
@@ -123,10 +123,10 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, RMContainer updatedContainer,
         ActivityState activityState) {
-      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
-      if (nodeId == null) {
+      if (activitiesManager == null) {
         return;
       }
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
       if (activitiesManager.shouldRecordThisNode(nodeId)) {
         String type = "container";
         // Add application-container activity into specific node allocation.
@@ -163,10 +163,10 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, FiCaSchedulerNode node,
         long currentTime,
         SchedulerApplicationAttempt application) {
-      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
-      if (nodeId == null) {
+      if (activitiesManager == null) {
         return;
       }
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
       activitiesManager
           .startAppAllocationRecording(nodeId, currentTime,
               application);
@@ -214,10 +214,10 @@ public class ActivitiesLogger {
     public static void recordQueueActivity(ActivitiesManager activitiesManager,
         SchedulerNode node, String parentQueueName, String queueName,
         ActivityState state, String diagnostic) {
-      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
-      if (nodeId == null) {
+      if (activitiesManager == null) {
         return;
       }
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
       if (activitiesManager.shouldRecordThisNode(nodeId)) {
         recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
             null, state, diagnostic, null);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 740e974..99ee48ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
 import org.apache.hadoop.yarn.util.SystemClock;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.List;
 import java.util.Set;
@@ -57,9 +59,10 @@ public class ActivitiesManager extends AbstractService {
   private Set<NodeId> activeRecordedNodes;
   private ConcurrentMap<ApplicationId, Long>
       recordingAppActivitiesUntilSpecifiedTime;
-  private ConcurrentMap<ApplicationId, AppAllocation> appsAllocation;
-  private ConcurrentMap<ApplicationId, List<AppAllocation>>
-      completedAppAllocations;
+  private ThreadLocal<Map<ApplicationId, AppAllocation>>
+      appsAllocation;
+  @VisibleForTesting
+  ConcurrentMap<ApplicationId, Queue<AppAllocation>> completedAppAllocations;
   private boolean recordNextAvailableNode = false;
   private List<NodeAllocation> lastAvailableNodeActivities = null;
   private Thread cleanUpThread;
@@ -71,7 +74,7 @@ public class ActivitiesManager extends AbstractService {
     super(ActivitiesManager.class.getName());
     recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
     completedNodeAllocations = new ConcurrentHashMap<>();
-    appsAllocation = new ConcurrentHashMap<>();
+    appsAllocation = ThreadLocal.withInitial(() -> new HashMap());
     completedAppAllocations = new ConcurrentHashMap<>();
     activeRecordedNodes = Collections.newSetFromMap(new ConcurrentHashMap<>());
     recordingAppActivitiesUntilSpecifiedTime = new ConcurrentHashMap<>();
@@ -79,11 +82,15 @@ public class ActivitiesManager extends AbstractService {
   }
 
   public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId) {
-    if (rmContext.getRMApps().get(applicationId).getFinalApplicationStatus()
+    RMApp app = rmContext.getRMApps().get(applicationId);
+    if (app != null && app.getFinalApplicationStatus()
         == FinalApplicationStatus.UNDEFINED) {
-      List<AppAllocation> allocations = completedAppAllocations.get(
-          applicationId);
-
+      Queue<AppAllocation> curAllocations =
+          completedAppAllocations.get(applicationId);
+      List<AppAllocation> allocations = null;
+      if (curAllocations != null) {
+        allocations = new ArrayList(curAllocations);
+      }
       return new AppActivitiesInfo(allocations, applicationId);
     } else {
       return new AppActivitiesInfo(
@@ -135,13 +142,13 @@ public class ActivitiesManager extends AbstractService {
             }
           }
 
-          Iterator<Map.Entry<ApplicationId, List<AppAllocation>>> iteApp =
+          Iterator<Map.Entry<ApplicationId, Queue<AppAllocation>>> iteApp =
               completedAppAllocations.entrySet().iterator();
           while (iteApp.hasNext()) {
-            Map.Entry<ApplicationId, List<AppAllocation>> appAllocation =
+            Map.Entry<ApplicationId, Queue<AppAllocation>> appAllocation =
                 iteApp.next();
-            if (rmContext.getRMApps().get(appAllocation.getKey())
-                .getFinalApplicationStatus()
+            RMApp rmApp = rmContext.getRMApps().get(appAllocation.getKey());
+            if (rmApp == null || rmApp.getFinalApplicationStatus()
                 != FinalApplicationStatus.UNDEFINED) {
               iteApp.remove();
             }
@@ -191,18 +198,16 @@ public class ActivitiesManager extends AbstractService {
       SchedulerApplicationAttempt application) {
     ApplicationId applicationId = application.getApplicationId();
 
-    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
-        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
-        > currTS) {
-      appsAllocation.put(applicationId,
-          new AppAllocation(application.getPriority(), nodeID,
-              application.getQueueName()));
-    }
-
-    if (recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
-        && recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
-        <= currTS) {
-      turnOffActivityMonitoringForApp(applicationId);
+    Long turnOffTimestamp =
+        recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
+    if (turnOffTimestamp != null) {
+      if (turnOffTimestamp > currTS) {
+        appsAllocation.get().put(applicationId,
+            new AppAllocation(application.getPriority(), nodeID,
+                application.getQueueName()));
+      } else {
+        turnOffActivityMonitoringForApp(applicationId);
+      }
     }
   }
 
@@ -223,7 +228,7 @@ public class ActivitiesManager extends AbstractService {
       ContainerId containerId, String priority, ActivityState state,
       String diagnostic, String type) {
     if (shouldRecordThisApp(applicationId)) {
-      AppAllocation appAllocation = appsAllocation.get(applicationId);
+      AppAllocation appAllocation = appsAllocation.get().get(applicationId);
       appAllocation.addAppAllocationActivity(containerId == null ?
           "Container-Id-Not-Assigned" :
           containerId.toString(), priority, state, diagnostic, type);
@@ -245,24 +250,27 @@ public class ActivitiesManager extends AbstractService {
       ContainerId containerId, ActivityState appState, String diagnostic) {
     if (shouldRecordThisApp(applicationId)) {
       long currTS = SystemClock.getInstance().getTime();
-      AppAllocation appAllocation = appsAllocation.remove(applicationId);
+      AppAllocation appAllocation = appsAllocation.get().remove(applicationId);
       appAllocation.updateAppContainerStateAndTime(containerId, appState,
           currTS, diagnostic);
 
-      List<AppAllocation> appAllocations;
-      if (completedAppAllocations.containsKey(applicationId)) {
-        appAllocations = completedAppAllocations.get(applicationId);
-      } else {
-        appAllocations = new ArrayList<>();
-        completedAppAllocations.put(applicationId, appAllocations);
+      Queue<AppAllocation> appAllocations =
+          completedAppAllocations.get(applicationId);
+      if (appAllocations == null) {
+        appAllocations = new ConcurrentLinkedQueue<>();
+        Queue<AppAllocation> curAppAllocations =
+            completedAppAllocations.putIfAbsent(applicationId, appAllocations);
+        if (curAppAllocations != null) {
+          appAllocations = curAppAllocations;
+        }
       }
       if (appAllocations.size() == 1000) {
-        appAllocations.remove(0);
+        appAllocations.poll();
       }
       appAllocations.add(appAllocation);
-
-      if (recordingAppActivitiesUntilSpecifiedTime.get(applicationId)
-          <= currTS) {
+      Long stopTime =
+          recordingAppActivitiesUntilSpecifiedTime.get(applicationId);
+      if (stopTime != null && stopTime <= currTS) {
         turnOffActivityMonitoringForApp(applicationId);
       }
     }
@@ -292,8 +300,12 @@ public class ActivitiesManager extends AbstractService {
   }
 
   boolean shouldRecordThisApp(ApplicationId applicationId) {
+    if (recordingAppActivitiesUntilSpecifiedTime.isEmpty()
+        || appsAllocation.get().isEmpty()) {
+      return false;
+    }
     return recordingAppActivitiesUntilSpecifiedTime.containsKey(applicationId)
-        && appsAllocation.containsKey(applicationId);
+        && appsAllocation.get().containsKey(applicationId);
   }
 
   boolean shouldRecordThisNode(NodeId nodeID) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
index 15850c0..1903ae7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -68,7 +68,7 @@ public class AppAllocation {
   }
 
   public String getNodeId() {
-    return nodeId.toString();
+    return nodeId == null ? null : nodeId.toString();
   }
 
   public String getQueueName() {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index e4c81db..2e422e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -746,6 +746,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
         return appActivitiesInfo;
       } catch (Exception e) {
         String errMessage = "Cannot find application with given appId";
+        LOG.error(errMessage, e);
         return new AppActivitiesInfo(errMessage, appId);
       }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
index 5216a21..bc81e61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -189,6 +192,55 @@ public class TestActivitiesManager {
     Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
   }
 
+
+  /**
+   * Test recording app activities in multiple threads,
+   * only one activity info should be recorded by one of these threads.
+   */
+  @Test
+  public void testRecordingAppActivitiesInMultiThreads()
+      throws Exception {
+    Random rand = new Random();
+    // start recording activities for a random app
+    SchedulerApplicationAttempt randomApp = apps.get(rand.nextInt(NUM_APPS));
+    activitiesManager
+        .turnOnAppActivitiesRecording(randomApp.getApplicationId(), 3);
+    List<Future<Void>> futures = new ArrayList<>();
+    // generate app activities
+    int nTasks = 20;
+    for (int i=0; i<nTasks; i++) {
+      Callable<Void> task = () -> {
+        ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
+            (FiCaSchedulerNode) nodes.get(0),
+            SystemClock.getInstance().getTime(), randomApp);
+        for (SchedulerNode node : nodes) {
+          ActivitiesLogger.APP
+              .recordAppActivityWithoutAllocation(activitiesManager, node,
+                  randomApp, Priority.newInstance(0),
+                  ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+                  ActivityState.REJECTED);
+        }
+        ActivitiesLogger.APP
+            .finishAllocatedAppAllocationRecording(activitiesManager,
+                randomApp.getApplicationId(), null, ActivityState.SKIPPED,
+                ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
+        return null;
+      };
+      futures.add(threadPoolExecutor.submit(task));
+    }
+    // Check activities for multi-nodes should be recorded only once
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+    Queue<AppAllocation> appAllocations =
+        activitiesManager.completedAppAllocations
+            .get(randomApp.getApplicationId());
+    Assert.assertEquals(nTasks, appAllocations.size());
+    for(AppAllocation aa : appAllocations) {
+      Assert.assertEquals(NUM_NODES, aa.getAllocationAttempts().size());
+    }
+  }
+
   /**
    * Testing activities manager which can record all history information about
    * node allocations.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
index 724d592..7bc8634 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -21,6 +21,7 @@ import com.google.inject.Guice;
 import com.google.inject.servlet.ServletModule;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
 import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
 import com.sun.jersey.test.framework.WebAppDescriptor;
 import org.apache.hadoop.http.JettyUtils;
@@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceUsageMultiNodeLookupPolicy;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
@@ -84,6 +86,17 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
       // enable multi-nodes placement
       conf.setBoolean(
           CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
+      String policyName = "resource-based";
+      conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICIES,
+          policyName);
+      conf.set(CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME,
+          policyName);
+      String policyConfPrefix =
+          CapacitySchedulerConfiguration.MULTI_NODE_SORTING_POLICY_NAME + "."
+              + policyName;
+      conf.set(policyConfPrefix + ".class",
+          ResourceUsageMultiNodeLookupPolicy.class.getName());
+      conf.set(policyConfPrefix + ".sorting-interval.ms", "0");
       rm = new MockRM(conf);
       bind(ResourceManager.class).toInstance(rm);
       serve("/*").with(GuiceContainer.class);
@@ -204,6 +217,59 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
     }
   }
 
+  @Test
+  public void testAppAssignContainer() throws Exception {
+    rm.start();
+
+    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
+    MockNM nm2 = rm.registerNode("127.0.0.2:1234", 2 * 1024);
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(3072),
+              1)), null);
+
+      //Trigger recording for this app
+      WebResource r = resource();
+      MultivaluedMapImpl params = new MultivaluedMapImpl();
+      params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
+      ClientResponse response = r.path("ws").path("v1").path("cluster")
+          .path("scheduler/app-activities").queryParams(params)
+          .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      JSONObject json = response.getEntity(JSONObject.class);
+      assertEquals("waiting for display", json.getString("diagnostic"));
+
+      //Trigger scheduling for this app
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      RMNode rmNode = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+      //Check app activities, it should contain one allocation and
+      // final allocation state is ALLOCATED
+      response = r.path("ws").path("v1").path("cluster")
+          .path("scheduler/app-activities").queryParams(params)
+          .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations, "allocationState", "ACCEPTED");
+      JSONArray allocationAttempts =
+          allocations.getJSONArray("allocationAttempt");
+      assertEquals(2, allocationAttempts.length());
+    } finally {
+      rm.stop();
+    }
+  }
+
   private void verifyNumberOfAllocations(JSONObject json, int realValue)
       throws Exception {
     if (json.isNull("allocations")) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org