You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/04/08 05:41:34 UTC
[hadoop] branch trunk updated: YARN-9313. Support asynchronized
scheduling mode and multi-node lookup mechanism for scheduler activities.
Contributed by Tao Yang.
This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new fc05b0e YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.
fc05b0e is described below
commit fc05b0e70e9bb556d6bdc00fa8735e18a6f90bc9
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Mon Apr 8 13:40:53 2019 +0800
YARN-9313. Support asynchronized scheduling mode and multi-node lookup mechanism for scheduler activities. Contributed by Tao Yang.
---
.../scheduler/activities/ActivitiesLogger.java | 57 +++---
.../scheduler/activities/ActivitiesManager.java | 63 ++++--
.../scheduler/capacity/CapacityScheduler.java | 13 +-
.../activities/TestActivitiesManager.java | 220 ++++++++++++++++++++
.../TestRMWebServicesSchedulerActivities.java | 16 +-
...esSchedulerActivitiesWithMultiNodesEnabled.java | 227 +++++++++++++++++++++
6 files changed, 539 insertions(+), 57 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
index 9d3080e..46ca4bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesLogger.java
@@ -63,14 +63,14 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic) {
- String type = "app";
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
- if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
- recordActivity(activitiesManager, node, application.getQueueName(),
+ if (activitiesManager.shouldRecordThisNode(nodeId)) {
+ recordActivity(activitiesManager, nodeId, application.getQueueName(),
application.getApplicationId().toString(), priority,
- ActivityState.REJECTED, diagnostic, type);
+ ActivityState.REJECTED, diagnostic, "app");
}
finishSkippedAppAllocationRecording(activitiesManager,
application.getApplicationId(), ActivityState.REJECTED, diagnostic);
@@ -85,18 +85,19 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, Priority priority,
String diagnostic, ActivityState appState) {
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
- if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node,
+ activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(), null,
priority.toString(), ActivityState.SKIPPED, diagnostic, type);
type = "app";
// Add queue-application activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node,
+ activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.SKIPPED,
@@ -122,20 +123,21 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, SchedulerNode node,
SchedulerApplicationAttempt application, RMContainer updatedContainer,
ActivityState activityState) {
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
- if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
+ if (activitiesManager.shouldRecordThisNode(nodeId)) {
String type = "container";
// Add application-container activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node,
+ activitiesManager.addSchedulingActivityForNode(nodeId,
application.getApplicationId().toString(),
updatedContainer.getContainer().toString(),
updatedContainer.getContainer().getPriority().toString(),
activityState, ActivityDiagnosticConstant.EMPTY, type);
type = "app";
// Add queue-application activity into specific node allocation.
- activitiesManager.addSchedulingActivityForNode(node,
+ activitiesManager.addSchedulingActivityForNode(nodeId,
application.getQueueName(),
application.getApplicationId().toString(),
application.getPriority().toString(), ActivityState.ACCEPTED,
@@ -161,11 +163,12 @@ public class ActivitiesLogger {
ActivitiesManager activitiesManager, FiCaSchedulerNode node,
long currentTime,
SchedulerApplicationAttempt application) {
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
activitiesManager
- .startAppAllocationRecording(node.getNodeID(), currentTime,
+ .startAppAllocationRecording(nodeId, currentTime,
application);
}
@@ -211,11 +214,12 @@ public class ActivitiesLogger {
public static void recordQueueActivity(ActivitiesManager activitiesManager,
SchedulerNode node, String parentQueueName, String queueName,
ActivityState state, String diagnostic) {
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
- if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
- recordActivity(activitiesManager, node, parentQueueName, queueName,
+ if (activitiesManager.shouldRecordThisNode(nodeId)) {
+ recordActivity(activitiesManager, nodeId, parentQueueName, queueName,
null, state, diagnostic, null);
}
}
@@ -243,11 +247,12 @@ public class ActivitiesLogger {
public static void finishAllocatedNodeAllocation(
ActivitiesManager activitiesManager, SchedulerNode node,
ContainerId containerId, AllocationState containerState) {
- if (node == null || activitiesManager == null) {
+ NodeId nodeId = getRecordingNodeId(activitiesManager, node);
+ if (nodeId == null) {
return;
}
- if (activitiesManager.shouldRecordThisNode(node.getNodeID())) {
- activitiesManager.updateAllocationFinalState(node.getNodeID(),
+ if (activitiesManager.shouldRecordThisNode(nodeId)) {
+ activitiesManager.updateAllocationFinalState(nodeId,
containerId, containerState);
}
}
@@ -277,12 +282,16 @@ public class ActivitiesLogger {
// Add queue, application or container activity into specific node allocation.
private static void recordActivity(ActivitiesManager activitiesManager,
- SchedulerNode node, String parentName, String childName,
+ NodeId nodeId, String parentName, String childName,
Priority priority, ActivityState state, String diagnostic, String type) {
-
- activitiesManager.addSchedulingActivityForNode(node, parentName,
+ activitiesManager.addSchedulingActivityForNode(nodeId, parentName,
childName, priority != null ? priority.toString() : null, state,
diagnostic, type);
+ }
+ private static NodeId getRecordingNodeId(ActivitiesManager activitiesManager,
+ SchedulerNode node) {
+ return activitiesManager == null ? null :
+ activitiesManager.getRecordingNodeId(node);
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index c710ffc..740e974 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.service.AbstractService;
@@ -46,8 +47,13 @@ import java.util.ArrayList;
public class ActivitiesManager extends AbstractService {
private static final Logger LOG =
LoggerFactory.getLogger(ActivitiesManager.class);
- private ConcurrentMap<NodeId, List<NodeAllocation>> recordingNodesAllocation;
- private ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
+ // An empty node ID, we use this variable as a placeholder
+ // in the activity records when recording multiple nodes assignments.
+ public static final NodeId EMPTY_NODE_ID = NodeId.newInstance("", 0);
+ private ThreadLocal<Map<NodeId, List<NodeAllocation>>>
+ recordingNodesAllocation;
+ @VisibleForTesting
+ ConcurrentMap<NodeId, List<NodeAllocation>> completedNodeAllocations;
private Set<NodeId> activeRecordedNodes;
private ConcurrentMap<ApplicationId, Long>
recordingAppActivitiesUntilSpecifiedTime;
@@ -63,7 +69,7 @@ public class ActivitiesManager extends AbstractService {
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
- recordingNodesAllocation = new ConcurrentHashMap<>();
+ recordingNodesAllocation = ThreadLocal.withInitial(() -> new HashMap());
completedNodeAllocations = new ConcurrentHashMap<>();
appsAllocation = new ConcurrentHashMap<>();
completedAppAllocations = new ConcurrentHashMap<>();
@@ -173,9 +179,11 @@ public class ActivitiesManager extends AbstractService {
if (recordNextAvailableNode) {
recordNextNodeUpdateActivities(nodeID.toString());
}
- if (activeRecordedNodes.contains(nodeID)) {
+ // Removing from activeRecordedNodes immediately is to ensure that
+ // activities will be recorded just once in multiple threads.
+ if (activeRecordedNodes.remove(nodeID)) {
List<NodeAllocation> nodeAllocation = new ArrayList<>();
- recordingNodesAllocation.put(nodeID, nodeAllocation);
+ recordingNodesAllocation.get().put(nodeID, nodeAllocation);
}
}
@@ -199,12 +207,11 @@ public class ActivitiesManager extends AbstractService {
}
// Add queue, application or container activity into specific node allocation.
- void addSchedulingActivityForNode(SchedulerNode node, String parentName,
+ void addSchedulingActivityForNode(NodeId nodeId, String parentName,
String childName, String priority, ActivityState state, String diagnostic,
String type) {
- if (shouldRecordThisNode(node.getNodeID())) {
- NodeAllocation nodeAllocation = getCurrentNodeAllocation(
- node.getNodeID());
+ if (shouldRecordThisNode(nodeId)) {
+ NodeAllocation nodeAllocation = getCurrentNodeAllocation(nodeId);
nodeAllocation.addAllocationActivity(parentName, childName, priority,
state, diagnostic, type);
}
@@ -262,7 +269,7 @@ public class ActivitiesManager extends AbstractService {
}
void finishNodeUpdateRecording(NodeId nodeID) {
- List<NodeAllocation> value = recordingNodesAllocation.get(nodeID);
+ List<NodeAllocation> value = recordingNodesAllocation.get().get(nodeID);
long timeStamp = SystemClock.getInstance().getTime();
if (value != null) {
@@ -278,9 +285,8 @@ public class ActivitiesManager extends AbstractService {
}
if (shouldRecordThisNode(nodeID)) {
- recordingNodesAllocation.remove(nodeID);
+ recordingNodesAllocation.get().remove(nodeID);
completedNodeAllocations.put(nodeID, value);
- stopRecordNodeUpdateActivities(nodeID);
}
}
}
@@ -291,12 +297,15 @@ public class ActivitiesManager extends AbstractService {
}
boolean shouldRecordThisNode(NodeId nodeID) {
- return activeRecordedNodes.contains(nodeID) && recordingNodesAllocation
+ return isRecordingMultiNodes() || recordingNodesAllocation.get()
.containsKey(nodeID);
}
private NodeAllocation getCurrentNodeAllocation(NodeId nodeID) {
- List<NodeAllocation> nodeAllocations = recordingNodesAllocation.get(nodeID);
+ NodeId recordingKey =
+ isRecordingMultiNodes() ? EMPTY_NODE_ID : nodeID;
+ List<NodeAllocation> nodeAllocations =
+ recordingNodesAllocation.get().get(recordingKey);
NodeAllocation nodeAllocation;
// When this node has already stored allocation activities, get the
// last allocation for this node.
@@ -323,11 +332,29 @@ public class ActivitiesManager extends AbstractService {
return nodeAllocation;
}
- private void stopRecordNodeUpdateActivities(NodeId nodeId) {
- activeRecordedNodes.remove(nodeId);
- }
-
private void turnOffActivityMonitoringForApp(ApplicationId applicationId) {
recordingAppActivitiesUntilSpecifiedTime.remove(applicationId);
}
+
+ public boolean isRecordingMultiNodes() {
+ return recordingNodesAllocation.get().containsKey(EMPTY_NODE_ID);
+ }
+
+ /**
+ * Get recording node id:
+ * 1. node id of the input node if it is not null.
+ * 2. EMPTY_NODE_ID if input node is null and activities manager is
+ * recording multi-nodes.
+ * 3. null otherwise.
+ * @param node - input node
+ * @return recording nodeId
+ */
+ public NodeId getRecordingNodeId(SchedulerNode node) {
+ if (node != null) {
+ return node.getNodeID();
+ } else if (isRecordingMultiNodes()) {
+ return ActivitiesManager.EMPTY_NODE_ID;
+ }
+ return null;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 4baf405..b8fdd42 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1297,17 +1297,12 @@ public class CapacityScheduler extends
if (!scheduleAsynchronously) {
writeLock.lock();
try {
- ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
- rmNode.getNodeID());
-
// reset allocation and reservation stats before we start doing any
// work
updateSchedulerHealth(lastNodeUpdateTime, rmNode.getNodeID(),
CSAssignment.NULL_ASSIGNMENT);
allocateContainersToNode(rmNode.getNodeID(), true);
- ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
- rmNode.getNodeID());
} finally {
writeLock.unlock();
}
@@ -1706,10 +1701,18 @@ public class CapacityScheduler extends
// nodes.
CSAssignment assignment;
if (!multiNodePlacementEnabled) {
+ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+ node.getNodeID());
assignment = allocateContainerOnSingleNode(candidates,
node, withNodeHeartbeat);
+ ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+ node.getNodeID());
} else{
+ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+ ActivitiesManager.EMPTY_NODE_ID);
assignment = allocateContainersOnMultiNodes(candidates);
+ ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+ ActivitiesManager.EMPTY_NODE_ID);
}
if (assignment != null && assignment.getAssignmentInformation() != null
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
new file mode 100644
index 0000000..5216a21
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test class for {@link ActivitiesManager}.
+ */
+public class TestActivitiesManager {
+
+ private final static int NUM_NODES = 5;
+
+ private final static int NUM_APPS = 5;
+
+ private final static int NUM_THREADS = 5;
+
+ private RMContext rmContext;
+
+ private TestingActivitiesManager activitiesManager;
+
+ private List<SchedulerApplicationAttempt> apps;
+
+ private List<SchedulerNode> nodes;
+
+ private ThreadPoolExecutor threadPoolExecutor;
+
+ @Before
+ public void setup() {
+ rmContext = Mockito.mock(RMContext.class);
+ ResourceScheduler scheduler = Mockito.mock(ResourceScheduler.class);
+ Mockito.when(scheduler.getMinimumResourceCapability())
+ .thenReturn(Resources.none());
+ Mockito.when(rmContext.getScheduler()).thenReturn(scheduler);
+ LeafQueue mockQueue = Mockito.mock(LeafQueue.class);
+ Map<ApplicationId, RMApp> rmApps = new ConcurrentHashMap<>();
+ Mockito.doReturn(rmApps).when(rmContext).getRMApps();
+ apps = new ArrayList<>();
+ for (int i = 0; i < NUM_APPS; i++) {
+ ApplicationAttemptId appAttemptId =
+ TestUtils.getMockApplicationAttemptId(i, 0);
+ RMApp mockApp = Mockito.mock(RMApp.class);
+ Mockito.doReturn(appAttemptId.getApplicationId()).when(mockApp)
+ .getApplicationId();
+ rmApps.put(appAttemptId.getApplicationId(), mockApp);
+ FiCaSchedulerApp app =
+ new FiCaSchedulerApp(appAttemptId, "user", mockQueue,
+ mock(ActiveUsersManager.class), rmContext);
+ apps.add(app);
+ }
+ nodes = new ArrayList<>();
+ for (int i = 0; i < NUM_NODES; i++) {
+ nodes.add(TestUtils.getMockNode("host" + i, "rack", 1, 10240));
+ }
+ activitiesManager = new TestingActivitiesManager(rmContext);
+ threadPoolExecutor =
+ new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 3L,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<>());
+ }
+
+ /**
+ * Test recording activities belong to different nodes in multiple threads,
+ * these threads can run without interference and one activity
+ * should be recorded by every thread.
+ */
+ @Test
+ public void testRecordingDifferentNodeActivitiesInMultiThreads()
+ throws Exception {
+ Random rand = new Random();
+ List<Future<Void>> futures = new ArrayList<>();
+ for (SchedulerNode node : nodes) {
+ Callable<Void> task = () -> {
+ SchedulerApplicationAttempt randomApp =
+ apps.get(rand.nextInt(NUM_APPS));
+ // start recording activities for random node
+ activitiesManager.recordNextNodeUpdateActivities(
+ node.getNodeID().toString());
+ // generate node/app activities
+ ActivitiesLogger.NODE
+ .startNodeUpdateRecording(activitiesManager, node.getNodeID());
+ ActivitiesLogger.APP
+ .recordAppActivityWithoutAllocation(activitiesManager, node,
+ randomApp, Priority.newInstance(0),
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+ ActivityState.REJECTED);
+ ActivitiesLogger.NODE
+ .finishNodeUpdateRecording(activitiesManager, node.getNodeID());
+ return null;
+ };
+ futures.add(threadPoolExecutor.submit(task));
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ // Check activities for all nodes should be recorded and every node should
+ // have only one allocation information.
+ Assert.assertEquals(NUM_NODES,
+ activitiesManager.historyNodeAllocations.size());
+ for (List<List<NodeAllocation>> nodeAllocationsForThisNode :
+ activitiesManager.historyNodeAllocations.values()) {
+ Assert.assertEquals(1, nodeAllocationsForThisNode.size());
+ Assert.assertEquals(1, nodeAllocationsForThisNode.get(0).size());
+ }
+ }
+
+ /**
+ * Test recording activities for multi-nodes assignment in multiple threads,
+ * only one activity info should be recorded by one of these threads.
+ */
+ @Test
+ public void testRecordingSchedulerActivitiesForMultiNodesInMultiThreads()
+ throws Exception {
+ Random rand = new Random();
+ // start recording activities for multi-nodes
+ activitiesManager.recordNextNodeUpdateActivities(
+ ActivitiesManager.EMPTY_NODE_ID.toString());
+ List<Future<Void>> futures = new ArrayList<>();
+ // generate node/app activities
+ for (SchedulerNode node : nodes) {
+ Callable<Void> task = () -> {
+ SchedulerApplicationAttempt randomApp =
+ apps.get(rand.nextInt(NUM_APPS));
+ ActivitiesLogger.NODE.startNodeUpdateRecording(activitiesManager,
+ ActivitiesManager.EMPTY_NODE_ID);
+ ActivitiesLogger.APP
+ .recordAppActivityWithoutAllocation(activitiesManager, node,
+ randomApp, Priority.newInstance(0),
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE,
+ ActivityState.REJECTED);
+ ActivitiesLogger.NODE.finishNodeUpdateRecording(activitiesManager,
+ ActivitiesManager.EMPTY_NODE_ID);
+ return null;
+ };
+ futures.add(threadPoolExecutor.submit(task));
+ }
+ for (Future<Void> future : futures) {
+ future.get();
+ }
+ // Check activities for multi-nodes should be recorded only once
+ Assert.assertEquals(1, activitiesManager.historyNodeAllocations.size());
+ }
+
+ /**
+ * Testing activities manager which can record all history information about
+ * node allocations.
+ */
+ public class TestingActivitiesManager extends ActivitiesManager {
+
+ private Map<NodeId, List<List<NodeAllocation>>> historyNodeAllocations =
+ new ConcurrentHashMap<>();
+
+ public TestingActivitiesManager(RMContext rmContext) {
+ super(rmContext);
+ super.completedNodeAllocations = Mockito.spy(new ConcurrentHashMap<>());
+ Mockito.doAnswer((invocationOnMock) -> {
+ NodeId nodeId = (NodeId) invocationOnMock.getArguments()[0];
+ List<NodeAllocation> nodeAllocations =
+ (List<NodeAllocation>) invocationOnMock.getArguments()[1];
+ List<List<NodeAllocation>> historyAllocationsForThisNode =
+ historyNodeAllocations.get(nodeId);
+ if (historyAllocationsForThisNode == null) {
+ historyAllocationsForThisNode = new ArrayList<>();
+ historyNodeAllocations.put(nodeId, historyAllocationsForThisNode);
+ }
+ historyAllocationsForThisNode.add(nodeAllocations);
+ return null;
+ }).when(completedNodeAllocations).put(any(NodeId.class),
+ any(List.class));
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
index 91b92de..932f58d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -95,16 +95,12 @@ public class TestRMWebServicesSchedulerActivities
response.getType().toString());
json = response.getEntity(JSONObject.class);
- verifyNumberOfAllocations(json, 11);
-
- JSONArray allocations = json.getJSONArray("allocations");
- for (int i = 0; i < allocations.length(); i++) {
- if (i != allocations.length() - 1) {
- verifyStateOfAllocations(allocations.getJSONObject(i),
- "finalAllocationState", "ALLOCATED");
- verifyQueueOrder(allocations.getJSONObject(i), "root-a-b-b2-b3-b1");
- }
- }
+ // Collection logic of scheduler activities changed after YARN-9313,
+ // only one allocation should be recorded for all scenarios.
+ verifyNumberOfAllocations(json, 1);
+ verifyStateOfAllocations(json.getJSONObject("allocations"),
+ "finalAllocationState", "ALLOCATED");
+ verifyQueueOrder(json.getJSONObject("allocations"), "root-a-b-b2-b3-b1");
}
finally {
rm.stop();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
new file mode 100644
index 0000000..724d592
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.webapp;
+
+import com.google.inject.Guice;
+import com.google.inject.servlet.ServletModule;
+import com.sun.jersey.api.client.ClientResponse;
+import com.sun.jersey.api.client.WebResource;
+import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
+import com.sun.jersey.test.framework.WebAppDescriptor;
+import org.apache.hadoop.http.JettyUtils;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
+import org.apache.hadoop.yarn.webapp.JerseyTestBase;
+import org.codehaus.jettison.json.JSONArray;
+import org.codehaus.jettison.json.JSONObject;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.ws.rs.core.MediaType;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for scheduler/app activities when multi-nodes enabled.
+ */
+public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
+ extends JerseyTestBase {
+
+ private static MockRM rm;
+ private static CapacitySchedulerConfiguration csConf;
+ private static YarnConfiguration conf;
+
+ public TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled() {
+ super(new WebAppDescriptor.Builder(
+ "org.apache.hadoop.yarn.server.resourcemanager.webapp")
+ .contextListenerClass(GuiceServletConfig.class)
+ .filterClass(com.google.inject.servlet.GuiceFilter.class)
+ .contextPath("jersey-guice-filter").servletPath("/").build());
+ }
+
+ private static class WebServletModule extends ServletModule {
+ @Override
+ protected void configureServlets() {
+ bind(JAXBContextResolver.class);
+ bind(RMWebServices.class);
+ bind(GenericExceptionHandler.class);
+ csConf = new CapacitySchedulerConfiguration();
+ setupQueueConfiguration(csConf);
+
+ conf = new YarnConfiguration(csConf);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ // enable multi-nodes placement
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.MULTI_NODE_PLACEMENT_ENABLED, true);
+ rm = new MockRM(conf);
+ bind(ResourceManager.class).toInstance(rm);
+ serve("/*").with(GuiceContainer.class);
+ }
+ }
+
+ private static void setupQueueConfiguration(
+ CapacitySchedulerConfiguration config) {
+ // Define top-level queues
+ config.setQueues(CapacitySchedulerConfiguration.ROOT,
+ new String[] {"a", "b"});
+
+ final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
+ config.setCapacity(queueA, 10.5f);
+ config.setMaximumCapacity(queueA, 50);
+
+ final String queueB = CapacitySchedulerConfiguration.ROOT + ".b";
+ config.setCapacity(queueB, 89.5f);
+ }
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ GuiceServletConfig.setInjector(
+ Guice.createInjector(new WebServletModule()));
+ }
+
+ @Test (timeout=30000)
+ public void testAssignContainer() throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 2 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.UNDEFINED, "127.0.0.1",
+ Resources.createResource(1024), 1), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "/default-rack",
+ Resources.createResource(1024), 1), ResourceRequest
+ .newInstance(Priority.UNDEFINED, "*", Resources.createResource(1024),
+ 1)), null);
+
+ //Trigger recording for multi-nodes without params
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ //Trigger scheduling for this app
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+ //Check scheduler activities, it should contain one allocation and
+ // final allocation state is ALLOCATED
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations,
+ "finalAllocationState", "ALLOCATED");
+ } finally {
+ rm.stop();
+ }
+ }
+
+ @Test (timeout=30000)
+ public void testSchedulingWithoutPendingRequests()
+ throws Exception {
+ //Start RM so that it accepts app submissions
+ rm.start();
+
+ MockNM nm = new MockNM("127.0.0.1:1234", 8 * 1024,
+ rm.getResourceTrackerService());
+ nm.registerNode();
+
+ try {
+ //Trigger recording for multi-nodes without params
+ WebResource r = resource();
+ ClientResponse response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ //Trigger scheduling for this app
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ RMNode rmNode = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode));
+
+ //Check scheduler activities, it should contain one allocation and
+ // final allocation state is SKIPPED
+ response = r.path("ws").path("v1").path("cluster").path(
+ "scheduler/activities").accept(
+ MediaType.APPLICATION_JSON).get(ClientResponse.class);
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
+ response.getType().toString());
+ JSONObject json = response.getEntity(JSONObject.class);
+
+ verifyNumberOfAllocations(json, 1);
+ JSONObject allocations = json.getJSONObject("allocations");
+ verifyStateOfAllocations(allocations,
+ "finalAllocationState", "SKIPPED");
+ } finally {
+ rm.stop();
+ }
+ }
+
+ private void verifyNumberOfAllocations(JSONObject json, int realValue)
+ throws Exception {
+ if (json.isNull("allocations")) {
+ assertEquals("Number of allocations is wrong", 0, realValue);
+ } else {
+ Object object = json.get("allocations");
+ if (object.getClass() == JSONObject.class) {
+ assertEquals("Number of allocations is wrong", 1, realValue);
+ } else if (object.getClass() == JSONArray.class) {
+ assertEquals("Number of allocations is wrong in: " + object,
+ ((JSONArray) object).length(), realValue);
+ }
+ }
+ }
+
+ private void verifyStateOfAllocations(JSONObject allocation,
+ String nameToCheck, String realState) throws Exception {
+ assertEquals("State of allocation is wrong", allocation.get(nameToCheck),
+ realState);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org