You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/04/08 05:41:34 UTC

[hadoop] branch trunk updated: YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.

This is an automated email from the ASF dual-hosted git repository.

wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fc05b0e  YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.
fc05b0e is described below

commit fc05b0e70e9bb556d6bdc00fa8735e18a6f90bc9
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Mon Apr 8 13:40:53 2019 +0800

    YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.
---
 .../scheduler/activities/ActivitiesLogger.java     |  57 +++---
 .../scheduler/activities/ActivitiesManager.java    |  63 ++++--
 .../scheduler/capacity/CapacityScheduler.java      |  13 +-
 .../activities/TestActivitiesManager.java          | 220 ++++++++++++++++++++
 .../TestRMWebServicesSchedulerActivities.java      |  16 +-
 ...esSchedulerActivitiesWithMultiNodesEnabled.java | 227 +++++++++++++++++++++
 6 files changed, 539 insertions(+), 57 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 9d3080e..46ca4bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -63,14 +63,14 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic) {
-      String type = "app";
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
-      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
-        recordActivity(activitiesManager, node, application.getQueueName(),
+      if (activitiesManager.shouldRecordThisNode(nodeId)) {
+        recordActivity(activitiesManager, nodeId, application.getQueueName(),
             application.getApplicationId().toString(), priority,
-            ActivityState.REJECTED, diagnostic, type);
+            ActivityState.REJECTED, diagnostic, "app");
       }
       finishSkippedAppAllocationRecording(activitiesManager,
           application.getApplicationId(), ActivityState.REJECTED, diagnostic);
@@ -85,18 +85,19 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, Priority priority,
         String diagnostic, ActivityState appState) {
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
-      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+      if (activitiesManager.shouldRecordThisNode(nodeId)) {
         String type = "container";
         // Add application-container activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node,
+        activitiesManager.addSchedulingActivityForNode(nodeId,
             application.getApplicationId().toString(), null,
             priority.toString(), ActivityState.SKIPPED, diagnostic, type);
         type = "app";
         // Add queue-application activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node,
+        activitiesManager.addSchedulingActivityForNode(nodeId,
             application.getQueueName(),
             application.getApplicationId().toString(),
             application.getPriority().toString(), ActivityState.SKIPPED,
@@ -122,20 +123,21 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, SchedulerNode node,
         SchedulerApplicationAttempt application, RMContainer updatedContainer,
         ActivityState activityState) {
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
-      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+      if (activitiesManager.shouldRecordThisNode(nodeId)) {
         String type = "container";
         // Add application-container activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node,
+        activitiesManager.addSchedulingActivityForNode(nodeId,
             application.getApplicationId().toString(),
             updatedContainer.getContainer().toString(),
             updatedContainer.getContainer().getPriority().toString(),
             activityState, ActivityDiagnosticConstant.EMPTY, type);
         type = "app";
         // Add queue-application activity into specific node allocation.
-        activitiesManager.addSchedulingActivityForNode(node,
+        activitiesManager.addSchedulingActivityForNode(nodeId,
             application.getQueueName(),
             application.getApplicationId().toString(),
             application.getPriority().toString(), ActivityState.ACCEPTED,
@@ -161,11 +163,12 @@ public class ActivitiesLogger {
         ActivitiesManager activitiesManager, FiCaSchedulerNode node,
         long currentTime,
         SchedulerApplicationAttempt application) {
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
       activitiesManager
-          .startAppAllocationRecording(node.getNodeID(), currentTime,
+          .startAppAllocationRecording(nodeId, currentTime,
               application);
     }
 
@@ -211,11 +214,12 @@ public class ActivitiesLogger {
     public static void recordQueueActivity(ActivitiesManager activitiesManager,
         SchedulerNode node, String parentQueueName, String queueName,
         ActivityState state, String diagnostic) {
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
-      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
-        recordActivity(activitiesManager, node, parentQueueName, queueName,
+      if (activitiesManager.shouldRecordThisNode(nodeId)) {
+        recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
             null, state, diagnostic, null);
       }
     }
@@ -243,11 +247,12 @@ public class ActivitiesLogger {
     public static void finishAllocatedNodeAllocation(
         ActivitiesManager activitiesManager, SchedulerNode node,
         ContainerId containerId, AllocationState containerState) {
-      if (node == null || activitiesManager == null) {
+      NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+      if (nodeId == null) {
         return;
       }
-      if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
-        activitiesManager.updateAllocationFinalState(node.getNodeID(),
+      if (activitiesManager.shouldRecordThisNode(nodeId)) {
+        activitiesManager.updateAllocationFinalState(nodeId,
             containerId, containerState);
       }
     }
@@ -277,12 +282,16 @@ public class ActivitiesLogger {
 
   // Add queue, application or container activity into specific node allocation.
   private static void recordActivity(ActivitiesManager activitiesManager,
-      SchedulerNode node, String parentName, String childName,
+      NodeId nodeId, String parentName, String childName,
       Priority priority, ActivityState state, String diagnostic, String type) {
-
-    activitiesManager.addSchedulingActivityForNode(node, parentName,
+    activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
         childName, priority != null ? priority.toString() : null, state,
         diagnostic, type);
+  }
 
+  private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
+      SchedulerNode node) {
+    return activitiesManager == null ? null :
+        activitiesManager.getRecordingNodeId(node);
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index c710ffc..740e974 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.service.AbstractService;
@@ -46,8 +47,13 @@ import java.util.ArrayList;
 public class ActivitiesManager extends AbstractService {
   private static final Logger LOG =
       LoggerFactory.getLogger(ActivitiesManager.class);
-  private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
-  private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
+  // An empty node ID, we use this variable as a placeholder
+  // in the activity records when recording multiple nodes assignments.
+  public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0);
+  private ThreadLocal<Map<NodeId, List<NodeAllocation>>>
+      recordingNodesAllocation;
+  @VisibleForTesting
+  ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
   private Set<NodeId> activeRecordedNodes;
   private ConcurrentMap<ApplicationId, Long>
       recordingAppActivitiesUntilSpecifiedTime;
@@ -63,7 +69,7 @@ public class ActivitiesManager extends AbstractService {
 
   public ActivitiesManager(RMContext rmContext) {
     super(ActivitiesManager.class.getName());
-    recordingNodesAllocation = new ConcurrentHashMap<>();
+    recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
     completedNodeAllocations = new ConcurrentHashMap<>();
     appsAllocation = new ConcurrentHashMap<>();
     completedAppAllocations = new ConcurrentHashMap<>();
@@ -173,9 +179,11 @@ public class ActivitiesManager extends AbstractService {
     if (recordNextAvailableNode) {
       recordNextNodeUpdateActivities(nodeID.toString());
     }
-    if (activeRecordedNodes.contains(nodeID)) {
+    // Removing from activeRecordedNodes immediately is to ensure that
+    // activities will be recorded just once in multiple threads.
+    if (activeRecordedNodes.remove(nodeID)) {
       List<NodeAllocation> nodeAllocation = new ArrayList<>();
-      recordingNodesAllocation.put(nodeID, nodeAllocation);
+      recordingNodesAllocation.get().put(nodeID, nodeAllocation);
     }
   }
 
@@ -199,12 +207,11 @@ public class ActivitiesManager extends AbstractService {
   }
 
   // Add queue, application or container activity into specific node allocation.
-  void addSchedulingActivityForNode(SchedulerNode node, String parentName,
+  void addSchedulingActivityForNode(NodeId nodeId, String parentName,
       String childName, String priority, ActivityState state, String diagnostic,
       String type) {
-    if (shouldRecordThisNode(node.getNodeID())) {
-      NodeAllocation nodeAllocation = getCurrentNodeAllocation(
-          node.getNodeID());
+    if (shouldRecordThisNode(nodeId)) {
+      NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId);
       nodeAllocation.addAllocationActivity(parentName, childName, priority,
           state, diagnostic, type);
     }
@@ -262,7 +269,7 @@ public class ActivitiesManager extends AbstractService {
   }
 
   void finishNodeUpdateRecording(NodeId nodeID) {
-    List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
+    List<NodeAllocation> value = recordingNodesAllocation.get().get(nodeID);
     long timeStamp = SystemClock.getInstance().getTime();
 
     if (value != null) {
@@ -278,9 +285,8 @@ public class ActivitiesManager extends AbstractService {
       }
 
       if (shouldRecordThisNode(nodeID)) {
-        recordingNodesAllocation.remove(nodeID);
+        recordingNodesAllocation.get().remove(nodeID);
         completedNodeAllocations.put(nodeID, value);
-        stopRecordNodeUpdateActivities(nodeID);
       }
     }
   }
@@ -291,12 +297,15 @@ public class ActivitiesManager extends AbstractService {
   }
 
   boolean shouldRecordThisNode(NodeId nodeID) {
-    return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
+    return isRecordingMultiNodes() || recordingNodesAllocation.get()
         .containsKey(nodeID);
   }
 
   private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
-    List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
+    NodeId recordingKey =
+        isRecordingMultiNodes() ? EMPTY_NODE_ID : nodeID;
+    List<NodeAllocation> nodeAllocations =
+        recordingNodesAllocation.get().get(recordingKey);
     NodeAllocation nodeAllocation;
     // When this node has already stored allocation activities, get the
     // last allocation for this node.
@@ -323,11 +332,29 @@ public class ActivitiesManager extends AbstractService {
     return nodeAllocation;
   }
 
-  private void stopRecordNodeUpdateActivities(NodeId nodeId) {
-    activeRecordedNodes.remove(nodeId);
-  }
-
   private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
     recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
   }
+
+  public boolean isRecordingMultiNodes() {
+    return recordingNodesAllocation.get().containsKey(EMPTY_NODE_ID);
+  }
+
+  /**
+   * Get recording node id:
+   * 1. node id of the input node if it is not null.
+   * 2. EMPTY_NODE_ID if input node is null and activities manager is
+   *    recording multi-nodes.
+   * 3. null otherwise.
+   * @param node - input node
+   * @return recording nodeId
+   */
+  public NodeId getRecordingNodeId(SchedulerNode node) {
+    if (node != null) {
+      return node.getNodeID();
+    } else if (isRecordingMultiNodes()) {
+      return ActivitiesManager.EMPTY_NODE_ID;
+    }
+    return null;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 4baf405..b8fdd42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1297,17 +1297,12 @@ public class CapacityScheduler extends
     if (!scheduleAsynchronously) {
       writeLock.lock();
       try {
-        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
-            rmNode.getNodeID());
-
         // reset allocation and reservation stats before we start doing any
         // work
         updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
             CSAssignment.NULL_ASSIGNMENT);
 
         allocateContainersToNode(rmNode.getNodeID(), true);
-        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
-            rmNode.getNodeID());
       } finally {
         writeLock.unlock();
       }
@@ -1706,10 +1701,18 @@ public class CapacityScheduler extends
     // nodes.
     CSAssignment assignment;
     if (!multiNodePlacementEnabled) {
+      ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+          node.getNodeID());
       assignment = allocateContainerOnSingleNode(candidates,
           node, withNodeHeartbeat);
+      ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+          node.getNodeID());
     } else{
+      ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+          ActivitiesManager.EMPTY_NODE_ID);
       assignment = allocateContainersOnMultiNodes(candidates);
+      ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+          ActivitiesManager.EMPTY_NODE_ID);
     }
 
     if (assignment != null && assignment.getAssignmentInformation() != null
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
new file mode 100644
index 0000000..5216a21
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test class for {@link ActivitiesManager}.
+ */
+public class TestActivitiesManager {
+
+  private final static int NUM_NODES = 5;
+
+  private final static int NUM_APPS = 5;
+
+  private final static int NUM_THREADS = 5;
+
+  private RMContext rmContext;
+
+  private TestingActivitiesManager activitiesManager;
+
+  private List<SchedulerApplicationAttempt> apps;
+
+  private List<SchedulerNode> nodes;
+
+  private ThreadPoolExecutor threadPoolExecutor;
+
+  @Before
+  public void setup() {
+    rmContext = Mockito.mock(RMContext.class);
+    ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
+    Mockito.when(scheduler.getMinimumResourceCapability())
+        .thenReturn(Resources.none());
+    Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
+    LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
+    Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
+    Mockito.doReturn(rmApps).when(rmContext).getRMApps();
+    apps = new ArrayList<>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      ApplicationAttemptId appAttemptId =
+          TestUtils.getMockApplicationAttemptId(i, 0);
+      RMApp mockApp = Mockito.mock(RMApp.class);
+      Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp)
+          .getApplicationId();
+      rmApps.put(appAttemptId.getApplicationId(), mockApp);
+      FiCaSchedulerApp app =
+          new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
+              mock(ActiveUsersManager.class), rmContext);
+      apps.add(app);
+    }
+    nodes = new ArrayList<>();
+    for (int i = 0; i < NUM_NODES; i++) {
+      nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240));
+    }
+    activitiesManager = new TestingActivitiesManager(rmContext);
+    threadPoolExecutor =
+        new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L,
+            TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+  }
+
+  /**
+   * Test recording activities belong to different nodes in multiple threads,
+   * these threads can run without interference and one activity
+   * should be recorded by every thread.
+   */
+  @Test
+  public void testRecordingDifferentNodeActivitiesInMultiThreads()
+      throws Exception {
+    Random rand = new Random();
+    List<Future<Void>> futures = new ArrayList<>();
+    for (SchedulerNode node : nodes) {
+      Callable<Void> task = () -> {
+        SchedulerApplicationAttempt randomApp =
+            apps.get(rand.nextInt(NUM_APPS));
+        // start recording activities for random node
+        activitiesManager.recordNextNodeUpdateActivities(
+            node.getNodeID().toString());
+        // generate node/app activities
+        ActivitiesLogger.NODE
+            .startNodeUpdateRecording(activitiesManager, node.getNodeID());
+        ActivitiesLogger.APP
+            .recordAppActivityWithoutAllocation(activitiesManager, node,
+                randomApp, Priority.newInstance(0),
+                ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+                ActivityState.REJECTED);
+        ActivitiesLogger.NODE
+            .finishNodeUpdateRecording(activitiesManager, node.getNodeID());
+        return null;
+      };
+      futures.add(threadPoolExecutor.submit(task));
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+    // Check activities for all nodes should be recorded and every node should
+    // have only one allocation information.
+    Assert.assertEquals(NUM_NODES,
+        activitiesManager.historyNodeAllocations.size());
+    for (List<List<NodeAllocation>> nodeAllocationsForThisNode :
+        activitiesManager.historyNodeAllocations.values()) {
+      Assert.assertEquals(1, nodeAllocationsForThisNode.size());
+      Assert.assertEquals(1, nodeAllocationsForThisNode.get(0).size());
+    }
+  }
+
+  /**
+   * Test recording activities for multi-nodes assignment in multiple threads,
+   * only one activity info should be recorded by one of these threads.
+   */
+  @Test
+  public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads()
+      throws Exception {
+    Random rand = new Random();
+    // start recording activities for multi-nodes
+    activitiesManager.recordNextNodeUpdateActivities(
+        ActivitiesManager.EMPTY_NODE_ID.toString());
+    List<Future<Void>> futures = new ArrayList<>();
+    // generate node/app activities
+    for (SchedulerNode node : nodes) {
+      Callable<Void> task = () -> {
+        SchedulerApplicationAttempt randomApp =
+            apps.get(rand.nextInt(NUM_APPS));
+        ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+            ActivitiesManager.EMPTY_NODE_ID);
+        ActivitiesLogger.APP
+            .recordAppActivityWithoutAllocation(activitiesManager, node,
+                randomApp, Priority.newInstance(0),
+                ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+                ActivityState.REJECTED);
+        ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+            ActivitiesManager.EMPTY_NODE_ID);
+        return null;
+      };
+      futures.add(threadPoolExecutor.submit(task));
+    }
+    for (Future<Void> future : futures) {
+      future.get();
+    }
+    // Check activities for multi-nodes should be recorded only once
+    Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
+  }
+
+  /**
+   * Testing activities manager which can record all history information about
+   * node allocations.
+   */
+  public class TestingActivitiesManager extends ActivitiesManager {
+
+    private Map<NodeId, List<List<NodeAllocation>>> historyNodeAllocations =
+        new ConcurrentHashMap<>();
+
+    public TestingActivitiesManager(RMContext rmContext) {
+      super(rmContext);
+      super.completedNodeAllocations = Mockito.spy(new ConcurrentHashMap<>());
+      Mockito.doAnswer((invocationOnMock) -> {
+        NodeId nodeId = (NodeId) invocationOnMock.getArguments()[0];
+        List<NodeAllocation> nodeAllocations =
+            (List<NodeAllocation>) invocationOnMock.getArguments()[1];
+        List<List<NodeAllocation>> historyAllocationsForThisNode =
+            historyNodeAllocations.get(nodeId);
+        if (historyAllocationsForThisNode == null) {
+          historyAllocationsForThisNode = new ArrayList<>();
+          historyNodeAllocations.put(nodeId, historyAllocationsForThisNode);
+        }
+        historyAllocationsForThisNode.add(nodeAllocations);
+        return null;
+      }).when(completedNodeAllocations).put(any(NodeId.class),
+          any(List.class));
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
index 91b92de..932f58d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -95,16 +95,12 @@ public class TestRMWebServicesSchedulerActivities
           response.getType().toString());
       json = response.getEntity(JSONObject.class);
 
-      verifyNumberOfAllocations(json, 11);
-
-      JSONArray allocations = json.getJSONArray("allocations");
-      for (int i = 0; i < allocations.length(); i++) {
-        if (i != allocations.length() - 1) {
-          verifyStateOfAllocations(allocations.getJSONObject(i),
-              "finalAllocationState", "ALLOCATED");
-          verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
-        }
-      }
+      // Collection logic of scheduler activities changed after YARN-9313,
+      // only one allocation should be recorded for all scenarios.
+      verifyNumberOfAllocations(json, 1);
+      verifyStateOfAllocations(json.getJSONObject("allocations"),
+          "finalAllocationState", "ALLOCATED");
+      verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1");
     }
     finally {
       rm.stop();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
new file mode 100644
index 0000000..724d592
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Guice;
+import com.google.inject.servlet.ServletModule;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import org.apache.hadoop.http.JettyUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
+import org.apache.hadoop.yarn.webapp.JerseyTestBase;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for scheduler/app activities when multi-nodes enabled.
+ */
+public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
+    extends JerseyTestBase {
+
+  private static MockRM rm;
+  private static CapacitySchedulerConfiguration csConf;
+  private static YarnConfiguration conf;
+
+  public TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled() {
+    super(new WebAppDescriptor.Builder(
+        "org.apache.hadoop.yarn.server.resourcemanager.webapp")
+        .contextListenerClass(GuiceServletConfig.class)
+        .filterClass(com.google.inject.servlet.GuiceFilter.class)
+        .contextPath("jersey-guice-filter").servletPath("/").build());
+  }
+
+  private static class WebServletModule extends ServletModule {
+    @Override
+    protected void configureServlets() {
+      bind(JAXBContextResolver.class);
+      bind(RMWebServices.class);
+      bind(GenericExceptionHandler.class);
+      csConf = new CapacitySchedulerConfiguration();
+      setupQueueConfiguration(csConf);
+
+      conf = new YarnConfiguration(csConf);
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+          ResourceScheduler.class);
+      // enable multi-nodes placement
+      conf.setBoolean(
+          CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
+      rm = new MockRM(conf);
+      bind(ResourceManager.class).toInstance(rm);
+      serve("/*").with(GuiceContainer.class);
+    }
+  }
+
+  private static void setupQueueConfiguration(
+      CapacitySchedulerConfiguration config) {
+    // Define top-level queues
+    config.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a", "b"});
+
+    final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+    config.setCapacity(queueA, 10.5f);
+    config.setMaximumCapacity(queueA, 50);
+
+    final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
+    config.setCapacity(queueB, 89.5f);
+  }
+
+  @Before
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    GuiceServletConfig.setInjector(
+        Guice.createInjector(new WebServletModule()));
+  }
+
+  @Test (timeout=30000)
+  public void testAssignContainer() throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 2 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b");
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+      am1.allocate(Arrays.asList(ResourceRequest
+          .newInstance(Priority.UNDEFINED, "127.0.0.1",
+              Resources.createResource(1024), 1), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "/default-rack",
+              Resources.createResource(1024), 1), ResourceRequest
+          .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+              1)), null);
+
+      //Trigger recording for multi-nodes without params
+      WebResource r = resource();
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      //Trigger scheduling for this app
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+      //Check scheduler activities, it should contain one allocation and
+      // final allocation state is ALLOCATED
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations,
+          "finalAllocationState", "ALLOCATED");
+    } finally {
+      rm.stop();
+    }
+  }
+
+  @Test (timeout=30000)
+  public void testSchedulingWithoutPendingRequests()
+      throws Exception {
+    //Start RM so that it accepts app submissions
+    rm.start();
+
+    MockNM nm = new MockNM("127.0.0.1:1234", 8 * 1024,
+        rm.getResourceTrackerService());
+    nm.registerNode();
+
+    try {
+      //Trigger recording for multi-nodes without params
+      WebResource r = resource();
+      ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      //Trigger scheduling for this app
+      CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+      RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+      //Check scheduler activities, it should contain one allocation and
+      // final allocation state is SKIPPED
+      response = r.path("ws").path("v1").path("cluster").path(
+          "scheduler/activities").accept(
+          MediaType.APPLICATION_JSON).get(ClientResponse.class);
+      assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+          response.getType().toString());
+      JSONObject json = response.getEntity(JSONObject.class);
+
+      verifyNumberOfAllocations(json, 1);
+      JSONObject allocations = json.getJSONObject("allocations");
+      verifyStateOfAllocations(allocations,
+          "finalAllocationState", "SKIPPED");
+    } finally {
+      rm.stop();
+    }
+  }
+
+  private void verifyNumberOfAllocations(JSONObject json, int realValue)
+      throws Exception {
+    if (json.isNull("allocations")) {
+      assertEquals("Number of allocations is wrong", 0, realValue);
+    } else {
+      Object object = json.get("allocations");
+      if (object.getClass() == JSONObject.class) {
+        assertEquals("Number of allocations is wrong", 1, realValue);
+      } else if (object.getClass() == JSONArray.class) {
+        assertEquals("Number of allocations is wrong in: " + object,
+            ((JSONArray) object).length(), realValue);
+      }
+    }
+  }
+
+  private void verifyStateOfAllocations(JSONObject allocation,
+      String nameToCheck, String realState) throws Exception {
+    assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
+        realState);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org