You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by pr...@apache.org on 2020/07/24 06:17:45 UTC
[hadoop] branch trunk updated: YARN-10319. Record Last N Scheduler
Activities from ActivitiesManager
This is an automated email from the ASF dual-hosted git repository.
prabhujoseph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 247eb09 YARN-10319. Record Last N Scheduler Activities from ActivitiesManager
247eb09 is described below
commit 247eb0979b6a3a723ea9a249ba4db1ee079eb909
Author: Prabhu Joseph <pr...@apache.org>
AuthorDate: Tue Jun 16 21:52:32 2020 +0530
YARN-10319. Record Last N Scheduler Activities from ActivitiesManager
Reviewed by Tao Yang and Adam Antal.
---
.../scheduler/activities/ActivitiesManager.java | 40 ++-
.../webapp/JAXBContextResolver.java | 3 +-
.../server/resourcemanager/webapp/RMWSConsts.java | 5 +
.../webapp/RMWebServiceProtocol.java | 14 +
.../resourcemanager/webapp/RMWebServices.java | 331 ++++++++++++---------
.../webapp/dao/BulkActivitiesInfo.java | 52 ++++
.../webapp/ActivitiesTestUtils.java | 2 +
.../TestRMWebServicesSchedulerActivities.java | 100 +++++++
.../webapp/DefaultRequestInterceptorREST.java | 10 +
.../router/webapp/FederationInterceptorREST.java | 7 +
.../server/router/webapp/RouterWebServices.java | 19 ++
.../router/webapp/BaseRouterWebServicesTest.java | 7 +
.../router/webapp/MockRESTRequestInterceptor.java | 7 +
.../webapp/PassThroughRESTRequestInterceptor.java | 8 +
.../src/site/markdown/ResourceManagerRest.md | 134 +++++++++
15 files changed, 593 insertions(+), 146 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index cc02ff6..3662a77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -44,8 +45,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInf
import org.apache.hadoop.yarn.util.SystemClock;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.List;
import java.util.Set;
import java.util.*;
@@ -75,7 +78,7 @@ public class ActivitiesManager extends AbstractService {
appsAllocation;
@VisibleForTesting
ConcurrentMap<ApplicationId, Queue<AppAllocation>> completedAppAllocations;
- private boolean recordNextAvailableNode = false;
+ private AtomicInteger recordCount = new AtomicInteger(0);
private List<NodeAllocation> lastAvailableNodeActivities = null;
private Thread cleanUpThread;
private long activitiesCleanupIntervalMs;
@@ -86,6 +89,8 @@ public class ActivitiesManager extends AbstractService {
private final RMContext rmContext;
private volatile boolean stopped;
private ThreadLocal<DiagnosticsCollectorManager> diagnosticCollectorManager;
+ private volatile ConcurrentLinkedDeque<Pair<NodeId, List<NodeAllocation>>>
+ lastNActivities;
public ActivitiesManager(RMContext rmContext) {
super(ActivitiesManager.class.getName());
@@ -102,6 +107,7 @@ public class ActivitiesManager extends AbstractService {
if (rmContext.getYarnConfiguration() != null) {
setupConfForCleanup(rmContext.getYarnConfiguration());
}
+ lastNActivities = new ConcurrentLinkedDeque<>();
}
private void setupConfForCleanup(Configuration conf) {
@@ -215,9 +221,30 @@ public class ActivitiesManager extends AbstractService {
return new ActivitiesInfo(allocations, nodeId, groupBy);
}
+
+ public List<ActivitiesInfo> recordAndGetBulkActivitiesInfo(
+ int activitiesCount, RMWSConsts.ActivitiesGroupBy groupBy)
+ throws InterruptedException {
+ recordCount.set(activitiesCount);
+ while (recordCount.get() > 0) {
+ Thread.sleep(1);
+ }
+ Iterator<Pair<NodeId, List<NodeAllocation>>> ite =
+ lastNActivities.iterator();
+ List<ActivitiesInfo> outList = new ArrayList<>();
+ while (ite.hasNext()) {
+ Pair<NodeId, List<NodeAllocation>> pair = ite.next();
+ outList.add(new ActivitiesInfo(pair.getRight(),
+ pair.getLeft().toString(), groupBy));
+ }
+ // reset with new activities
+ lastNActivities = new ConcurrentLinkedDeque<>();
+ return outList;
+ }
+
public void recordNextNodeUpdateActivities(String nodeId) {
if (nodeId == null) {
- recordNextAvailableNode = true;
+ recordCount.compareAndSet(0, 1);
} else {
activeRecordedNodes.add(NodeId.fromString(nodeId));
}
@@ -348,7 +375,7 @@ public class ActivitiesManager extends AbstractService {
}
void startNodeUpdateRecording(NodeId nodeID) {
- if (recordNextAvailableNode) {
+ if (recordCount.get() > 0) {
recordNextNodeUpdateActivities(nodeID.toString());
}
// Removing from activeRecordedNodes immediately is to ensure that
@@ -470,14 +497,17 @@ public class ActivitiesManager extends AbstractService {
allocation.setTimestamp(timestamp);
allocation.setPartition(partition);
}
- if (recordNextAvailableNode) {
- recordNextAvailableNode = false;
+ if (recordCount.get() > 0) {
+ recordCount.getAndDecrement();
}
}
if (shouldRecordThisNode(nodeID)) {
recordingNodesAllocation.get().remove(nodeID);
completedNodeAllocations.put(nodeID, value);
+ if (recordCount.get() >= 0) {
+ lastNActivities.add(Pair.of(nodeID, value));
+ }
}
}
// disable diagnostic collector
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java
index f6eb2ad..19aa201 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/JAXBContextResolver.java
@@ -57,7 +57,8 @@ public class JAXBContextResolver implements ContextResolver<JAXBContext> {
FairSchedulerQueueInfoList.class, AppTimeoutsInfo.class,
AppTimeoutInfo.class, ResourceInformationsInfo.class,
ActivitiesInfo.class, AppActivitiesInfo.class,
- QueueAclsInfo.class, QueueAclInfo.class};
+ QueueAclsInfo.class, QueueAclInfo.class,
+ BulkActivitiesInfo.class};
// these dao classes need root unwrapping
final Class[] rootUnwrappedTypes =
{ NewApplication.class, ApplicationSubmissionContextInfo.class,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
index 30406e5..82ceed3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
@@ -81,6 +81,10 @@ public final class RMWSConsts {
/** Path for {@code RMWebServiceProtocol#getActivities}. */
public static final String SCHEDULER_ACTIVITIES = "/scheduler/activities";
+ /** Path for {@code RMWebServiceProtocol#getBulkActivities}. */
+ public static final String SCHEDULER_BULK_ACTIVITIES =
+ "/scheduler/bulk-activities";
+
/** Path for {@code RMWebServiceProtocol#getAppActivities}. */
public static final String SCHEDULER_APP_ACTIVITIES =
"/scheduler/app-activities/{appid}";
@@ -252,6 +256,7 @@ public final class RMWSConsts {
public static final String ACTIONS = "actions";
public static final String SUMMARIZE = "summarize";
public static final String NAME = "name";
+ public static final String ACTIVITIES_COUNT = "activitiesCount";
private RMWSConsts() {
// not called
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
index a41208e..f2736e3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
/**
@@ -213,6 +214,19 @@ public interface RMWebServiceProtocol {
String groupBy);
/**
+ * This method retrieve the last n activities inside scheduler and it is
+ * reachable by using {@link RMWSConsts#SCHEDULER_BULK_ACTIVITIES}.
+ *
+ * @param hsr the servlet request
+ * @param groupBy the groupBy type by which the activities should be
+ * aggregated. It is a QueryParam.
+ * @param activitiesCount number of activities
+ * @return last n activities
+ */
+ BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+ String groupBy, int activitiesCount) throws InterruptedException;
+
+ /**
* This method retrieves all the activities for a specific app for a specific
* period of time, and it is reachable by using
* {@link RMWSConsts#SCHEDULER_APP_ACTIVITIES}.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index dfdaba9..7c4e5df 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -197,6 +197,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdat
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.StatisticsItemInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ConfigVersionInfo;
@@ -242,6 +243,9 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
public static final String DEFAULT_END_TIME = "-1";
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
public static final String DEFAULT_SUMMARIZE = "false";
+ public static final String DEFAULT_ACTIVITIES_COUNT = "10";
+ public static final int MAX_ACTIVITIES_COUNT = 500;
+ private static final String ERROR_MSG = "Not Capacity Scheduler";
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
@@ -697,76 +701,133 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
public ActivitiesInfo getActivities(@Context HttpServletRequest hsr,
@QueryParam(RMWSConsts.NODEID) String nodeId,
@QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
- initForReadableEndpoints();
- YarnScheduler scheduler = rm.getRMContext().getScheduler();
- if (scheduler instanceof AbstractYarnScheduler) {
- String errMessage = "";
+ initForReadableEndpoints();
- AbstractYarnScheduler abstractYarnScheduler =
- (AbstractYarnScheduler) scheduler;
+ ActivitiesManager activitiesManager = getActivitiesManager();
+ if (null == activitiesManager) {
+ return new ActivitiesInfo(ERROR_MSG, nodeId);
+ }
- ActivitiesManager activitiesManager =
- abstractYarnScheduler.getActivitiesManager();
- if (null == activitiesManager) {
- errMessage = "Not Capacity Scheduler";
- return new ActivitiesInfo(errMessage, nodeId);
- }
+ RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
+ try {
+ activitiesGroupBy = parseActivitiesGroupBy(groupBy);
+ } catch (IllegalArgumentException e) {
+ return new ActivitiesInfo(e.getMessage(), nodeId);
+ }
- RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
- try {
- activitiesGroupBy = parseActivitiesGroupBy(groupBy);
- } catch (IllegalArgumentException e) {
- return new ActivitiesInfo(e.getMessage(), nodeId);
- }
+ AbstractYarnScheduler abstractYarnScheduler =
+ (AbstractYarnScheduler) rm.getRMContext().getScheduler();
- List<FiCaSchedulerNode> nodeList =
- abstractYarnScheduler.getNodeTracker().getAllNodes();
+ List<FiCaSchedulerNode> nodeList =
+ abstractYarnScheduler.getNodeTracker().getAllNodes();
- boolean illegalInput = false;
+ boolean illegalInput = false;
+ String errMessage = "";
- if (nodeList.size() == 0) {
- illegalInput = true;
- errMessage = "No node manager running in the cluster";
- } else {
- if (nodeId != null) {
- String hostName = nodeId;
- String portName = "";
- if (nodeId.contains(":")) {
- int index = nodeId.indexOf(":");
- hostName = nodeId.substring(0, index);
- portName = nodeId.substring(index + 1);
- }
+ if (nodeList.size() == 0) {
+ illegalInput = true;
+ errMessage = "No node manager running in the cluster";
+ } else {
+ if (nodeId != null) {
+ String hostName = nodeId;
+ String portName = "";
+ if (nodeId.contains(":")) {
+ int index = nodeId.indexOf(":");
+ hostName = nodeId.substring(0, index);
+ portName = nodeId.substring(index + 1);
+ }
- boolean correctNodeId = false;
- for (FiCaSchedulerNode node : nodeList) {
- if ((portName.equals("")
- && node.getRMNode().getHostName().equals(hostName))
- || (!portName.equals("")
- && node.getRMNode().getHostName().equals(hostName)
- && String.valueOf(node.getRMNode().getCommandPort())
- .equals(portName))) {
- correctNodeId = true;
- nodeId = node.getNodeID().toString();
- break;
- }
- }
- if (!correctNodeId) {
- illegalInput = true;
- errMessage = "Cannot find node manager with given node id";
+ boolean correctNodeId = false;
+ for (FiCaSchedulerNode node : nodeList) {
+ if ((portName.equals("")
+ && node.getRMNode().getHostName().equals(hostName))
+ || (!portName.equals("")
+ && node.getRMNode().getHostName().equals(hostName)
+ && String.valueOf(node.getRMNode().getCommandPort())
+ .equals(portName))) {
+ correctNodeId = true;
+ nodeId = node.getNodeID().toString();
+ break;
}
}
+ if (!correctNodeId) {
+ illegalInput = true;
+ errMessage = "Cannot find node manager with given node id";
+ }
}
+ }
- if (!illegalInput) {
- activitiesManager.recordNextNodeUpdateActivities(nodeId);
- return activitiesManager.getActivitiesInfo(nodeId, activitiesGroupBy);
- }
+ if (!illegalInput) {
+ activitiesManager.recordNextNodeUpdateActivities(nodeId);
+ return activitiesManager.getActivitiesInfo(nodeId, activitiesGroupBy);
+ }
+
+ // Return a activities info with error message
+ return new ActivitiesInfo(errMessage, nodeId);
+ }
+
+
+ @GET
+ @Path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES)
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ @Override
+ public BulkActivitiesInfo getBulkActivities(
+ @Context HttpServletRequest hsr,
+ @QueryParam(RMWSConsts.GROUP_BY) String groupBy,
+ @QueryParam(RMWSConsts.ACTIVITIES_COUNT)
+ @DefaultValue(DEFAULT_ACTIVITIES_COUNT) int activitiesCount)
+ throws InterruptedException {
+
+ initForReadableEndpoints();
+
+ ActivitiesManager activitiesManager = getActivitiesManager();
+ if (null == activitiesManager) {
+ throw new BadRequestException(ERROR_MSG);
+ }
- // Return a activities info with error message
- return new ActivitiesInfo(errMessage, nodeId);
+ RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
+ try {
+ activitiesGroupBy = parseActivitiesGroupBy(groupBy);
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException(e.getMessage());
}
+ AbstractYarnScheduler abstractYarnScheduler =
+ (AbstractYarnScheduler) rm.getRMContext().getScheduler();
+
+ List<FiCaSchedulerNode> nodeList =
+ abstractYarnScheduler.getNodeTracker().getAllNodes();
+ if (nodeList.size() == 0) {
+ throw new BadRequestException(
+ "No node manager running in the cluster");
+ }
+
+ if (activitiesCount <= 0) {
+ activitiesCount = Integer.parseInt(DEFAULT_ACTIVITIES_COUNT);
+ }
+ activitiesCount = Math.min(activitiesCount, MAX_ACTIVITIES_COUNT);
+
+ List<ActivitiesInfo> activitiesList = activitiesManager
+ .recordAndGetBulkActivitiesInfo(activitiesCount,
+ activitiesGroupBy);
+ BulkActivitiesInfo bulkActivitiesInfo = new
+ BulkActivitiesInfo();
+ bulkActivitiesInfo.addAll(activitiesList);
+
+ return bulkActivitiesInfo;
+ }
+
+ private ActivitiesManager getActivitiesManager() {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ if (scheduler instanceof AbstractYarnScheduler) {
+ AbstractYarnScheduler abstractYarnScheduler =
+ (AbstractYarnScheduler) scheduler;
+ ActivitiesManager activitiesManager =
+ abstractYarnScheduler.getActivitiesManager();
+ return activitiesManager;
+ }
return null;
}
@@ -788,105 +849,95 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
boolean summarize) {
initForReadableEndpoints();
- YarnScheduler scheduler = rm.getRMContext().getScheduler();
- if (scheduler instanceof AbstractYarnScheduler) {
- AbstractYarnScheduler abstractYarnScheduler =
- (AbstractYarnScheduler) scheduler;
+ ActivitiesManager activitiesManager = getActivitiesManager();
+ if (null == activitiesManager) {
+ return new AppActivitiesInfo(ERROR_MSG, appId);
+ }
- ActivitiesManager activitiesManager =
- abstractYarnScheduler.getActivitiesManager();
- if (null == activitiesManager) {
- String errMessage = "Not Capacity Scheduler";
- return new AppActivitiesInfo(errMessage, appId);
- }
+ if (appId == null) {
+ String errMessage = "Must provide an application Id";
+ return new AppActivitiesInfo(errMessage, null);
+ }
- if (appId == null) {
- String errMessage = "Must provide an application Id";
- return new AppActivitiesInfo(errMessage, null);
- }
+ RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
+ try {
+ activitiesGroupBy = parseActivitiesGroupBy(groupBy);
+ } catch (IllegalArgumentException e) {
+ return new AppActivitiesInfo(e.getMessage(), appId);
+ }
- RMWSConsts.ActivitiesGroupBy activitiesGroupBy;
- try {
- activitiesGroupBy = parseActivitiesGroupBy(groupBy);
- } catch (IllegalArgumentException e) {
- return new AppActivitiesInfo(e.getMessage(), appId);
- }
+ Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
+ try {
+ requiredActions =
+ parseAppActivitiesRequiredActions(getFlatSet(actions));
+ } catch (IllegalArgumentException e) {
+ return new AppActivitiesInfo(e.getMessage(), appId);
+ }
- Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
- try {
- requiredActions =
- parseAppActivitiesRequiredActions(getFlatSet(actions));
- } catch (IllegalArgumentException e) {
- return new AppActivitiesInfo(e.getMessage(), appId);
- }
+ Set<Integer> parsedRequestPriorities;
+ try {
+ parsedRequestPriorities = getFlatSet(requestPriorities).stream()
+ .map(e -> Integer.valueOf(e)).collect(Collectors.toSet());
+ } catch (NumberFormatException e) {
+ return new AppActivitiesInfo("request priorities must be integers!",
+ appId);
+ }
+ Set<Long> parsedAllocationRequestIds;
+ try {
+ parsedAllocationRequestIds = getFlatSet(allocationRequestIds).stream()
+ .map(e -> Long.valueOf(e)).collect(Collectors.toSet());
+ } catch (NumberFormatException e) {
+ return new AppActivitiesInfo(
+ "allocation request Ids must be integers!", appId);
+ }
- Set<Integer> parsedRequestPriorities;
+ int limitNum = -1;
+ if (limit != null) {
try {
- parsedRequestPriorities = getFlatSet(requestPriorities).stream()
- .map(e -> Integer.valueOf(e)).collect(Collectors.toSet());
- } catch (NumberFormatException e) {
- return new AppActivitiesInfo("request priorities must be integers!",
- appId);
- }
- Set<Long> parsedAllocationRequestIds;
- try {
- parsedAllocationRequestIds = getFlatSet(allocationRequestIds).stream()
- .map(e -> Long.valueOf(e)).collect(Collectors.toSet());
- } catch (NumberFormatException e) {
- return new AppActivitiesInfo(
- "allocation request Ids must be integers!", appId);
- }
-
- int limitNum = -1;
- if (limit != null) {
- try {
- limitNum = Integer.parseInt(limit);
- if (limitNum <= 0) {
- return new AppActivitiesInfo(
- "limit must be greater than 0!", appId);
- }
- } catch (NumberFormatException e) {
- return new AppActivitiesInfo("limit must be integer!", appId);
+ limitNum = Integer.parseInt(limit);
+ if (limitNum <= 0) {
+ return new AppActivitiesInfo(
+ "limit must be greater than 0!", appId);
}
+ } catch (NumberFormatException e) {
+ return new AppActivitiesInfo("limit must be integer!", appId);
}
+ }
- double maxTime = 3.0;
+ double maxTime = 3.0;
- if (time != null) {
- if (time.contains(".")) {
- maxTime = Double.parseDouble(time);
- } else {
- maxTime = Double.parseDouble(time + ".0");
- }
+ if (time != null) {
+ if (time.contains(".")) {
+ maxTime = Double.parseDouble(time);
+ } else {
+ maxTime = Double.parseDouble(time + ".0");
}
+ }
- ApplicationId applicationId;
- try {
- applicationId = ApplicationId.fromString(appId);
- if (requiredActions
- .contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
- activitiesManager
- .turnOnAppActivitiesRecording(applicationId, maxTime);
- }
- if (requiredActions
- .contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
- AppActivitiesInfo appActivitiesInfo = activitiesManager
- .getAppActivitiesInfo(applicationId, parsedRequestPriorities,
- parsedAllocationRequestIds, activitiesGroupBy, limitNum,
- summarize, maxTime);
- return appActivitiesInfo;
- }
- return new AppActivitiesInfo("Successfully received "
- + (actions.size() == 1 ? "action: " : "actions: ")
- + StringUtils.join(',', actions), appId);
- } catch (Exception e) {
- String errMessage = "Cannot find application with given appId";
- LOG.error(errMessage, e);
- return new AppActivitiesInfo(errMessage, appId);
+ ApplicationId applicationId;
+ try {
+ applicationId = ApplicationId.fromString(appId);
+ if (requiredActions
+ .contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
+ activitiesManager
+ .turnOnAppActivitiesRecording(applicationId, maxTime);
}
-
+ if (requiredActions
+ .contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
+ AppActivitiesInfo appActivitiesInfo = activitiesManager
+ .getAppActivitiesInfo(applicationId, parsedRequestPriorities,
+ parsedAllocationRequestIds, activitiesGroupBy, limitNum,
+ summarize, maxTime);
+ return appActivitiesInfo;
+ }
+ return new AppActivitiesInfo("Successfully received "
+ + (actions.size() == 1 ? "action: " : "actions: ")
+ + StringUtils.join(',', actions), appId);
+ } catch (Exception e) {
+ String errMessage = "Cannot find application with given appId";
+ LOG.error(errMessage, e);
+ return new AppActivitiesInfo(errMessage, appId);
}
- return null;
}
private Set<String> getFlatSet(Set<String> set) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java
new file mode 100644
index 0000000..ad360cc
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/BulkActivitiesInfo.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
+
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.List;
+import java.util.ArrayList;
+
+/**
+ * DAO object to display allocation activities.
+ */
+@XmlRootElement(name = "bulkActivities")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class BulkActivitiesInfo {
+
+ private ArrayList<ActivitiesInfo> activities = new ArrayList<>();
+
+ public BulkActivitiesInfo() {
+ // JAXB needs this
+ }
+
+ public void add(ActivitiesInfo activitiesInfo) {
+ activities.add(activitiesInfo);
+ }
+
+ public ArrayList<ActivitiesInfo> getActivities() {
+ return activities;
+ }
+
+ public void addAll(List<ActivitiesInfo> activitiesInfoList) {
+ activities.addAll(activitiesInfoList);
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
index 3c6db7d..dce1b64 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
@@ -99,6 +99,8 @@ public final class ActivitiesTestUtils {
public static final String FN_SCHEDULER_ACT_ROOT = "activities";
+ public static final String FN_SCHEDULER_BULK_ACT_ROOT = "bulkActivities";
+
private ActivitiesTestUtils(){}
public static List<JSONObject> findInAllocations(JSONObject allocationObj,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
index 1dd8020..f864794 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import com.sun.jersey.api.client.ClientResponse;
import com.sun.jersey.api.client.WebResource;
import com.sun.jersey.core.util.MultivaluedMapImpl;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
@@ -72,6 +73,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTes
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_NAME;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ALLOCATIONS_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_ACT_ROOT;
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.FN_SCHEDULER_BULK_ACT_ROOT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.TOTAL_RESOURCE_INSUFFICIENT_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.UNMATCHED_PARTITION_OR_PC_DIAGNOSTIC_PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.ActivitiesTestUtils.getFirstSubNodeFromJson;
@@ -1586,4 +1588,102 @@ public class TestRMWebServicesSchedulerActivities
rm.stop();
}
}
+
+ @Test(timeout=30000)
+ public void testSchedulerBulkActivities() throws Exception {
+ rm.start();
+
+ MockNM nm1 = new MockNM("127.0.0.1:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+ MockNM nm2 = new MockNM("127.0.0.2:1234", 4 * 1024,
+ rm.getResourceTrackerService());
+
+ nm1.registerNode();
+ nm2.registerNode();
+
+ MockNM[] nms = new MockNM[] {nm1, nm2};
+
+ try {
+
+ // Validate if response has 5 node activities
+ int expectedCount = 5;
+ RESTClient restClient = new RESTClient(5);
+ restClient.start();
+
+ sendHeartbeat(restClient, nms);
+
+ JSONObject activitiesJson = restClient.getOutput().getJSONObject(
+ FN_SCHEDULER_BULK_ACT_ROOT);
+ Object activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
+ assertEquals("Number of activities is wrong", expectedCount,
+ ((JSONArray) activities).length());
+
+
+ // Validate if response does not exceed max 500
+ expectedCount = 1000;
+ restClient = new RESTClient(expectedCount);
+ restClient.start();
+
+ sendHeartbeat(restClient, nms);
+
+ activitiesJson = restClient.getOutput().getJSONObject(
+ FN_SCHEDULER_BULK_ACT_ROOT);
+ activities = activitiesJson.get(FN_SCHEDULER_ACT_ROOT);
+ assertEquals("Max Activities Limit does not work",
+ RMWebServices.MAX_ACTIVITIES_COUNT,
+ ((JSONArray) activities).length());
+
+ } finally {
+ rm.stop();
+ }
+ }
+
+ private class RESTClient extends Thread {
+
+ private int expectedCount;
+ private boolean done = false;
+ private JSONObject json;
+
+ RESTClient(int expectedCount) {
+ this.expectedCount = expectedCount;
+ }
+
+ boolean isDone() {
+ return done;
+ }
+
+ JSONObject getOutput() {
+ return json;
+ }
+
+ @Override
+ public void run() {
+ WebResource r = resource();
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add(RMWSConsts.ACTIVITIES_COUNT, expectedCount);
+
+ ClientResponse response = r.path("ws").path("v1").path("cluster")
+ .path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES).queryParams(params)
+ .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
+
+ assertEquals(MediaType.APPLICATION_JSON_TYPE + "; "
+ + JettyUtils.UTF_8, response.getType().toString());
+ json = response.getEntity(JSONObject.class);
+ done = true;
+ }
+ }
+
+ private void sendHeartbeat(RESTClient restClient, MockNM[] nms)
+ throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ try {
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
+ } catch (Exception e) {
+ return false;
+ }
+ return restClient.isDone();
+ }, 10, 20000);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
index 90ca992..00a8beb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -200,6 +201,15 @@ public class DefaultRequestInterceptorREST
}
@Override
+ public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+ String groupBy, int activitiesCount) {
+ return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
+ BulkActivitiesInfo.class, HTTPMethods.GET,
+ RMWSConsts.RM_WEB_SERVICE_PATH + RMWSConsts.SCHEDULER_BULK_ACTIVITIES,
+ null, null, getConf());
+ }
+
+ @Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index b14da6c..ab97b1a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -88,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.RouterMetrics;
import org.apache.hadoop.yarn.server.router.RouterServerUtil;
@@ -1148,6 +1149,12 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
@Override
+ public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+ String groupBy, int activitiesCount) throws InterruptedException {
+ throw new NotImplementedException("Code is not implemented");
+ }
+
+ @Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
index 4c694fb..bde4648 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -95,6 +96,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_ACTIVITIES_COUNT;
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE;
/**
@@ -458,6 +460,23 @@ public class RouterWebServices implements RMWebServiceProtocol {
}
@GET
+ @Path(RMWSConsts.SCHEDULER_BULK_ACTIVITIES)
+ @Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
+ MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
+ @Override
+ public BulkActivitiesInfo getBulkActivities(
+ @Context HttpServletRequest hsr,
+ @QueryParam(RMWSConsts.GROUP_BY) String groupBy,
+ @QueryParam(RMWSConsts.ACTIVITIES_COUNT)
+ @DefaultValue(DEFAULT_ACTIVITIES_COUNT) int activitiesCount)
+ throws InterruptedException {
+ init();
+ RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
+ return pipeline.getRootInterceptor().getBulkActivities(hsr, groupBy,
+ activitiesCount);
+ }
+
+ @GET
@Path(RMWSConsts.SCHEDULER_APP_ACTIVITIES)
@Produces({ MediaType.APPLICATION_JSON + "; " + JettyUtils.UTF_8,
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index b626a8a..05a088d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.router.Router;
import org.apache.hadoop.yarn.server.router.webapp.RouterWebServices.RequestInterceptorChainWrapper;
@@ -179,6 +180,12 @@ public abstract class BaseRouterWebServicesTest {
createHttpServletRequest(user), null, null);
}
+ protected BulkActivitiesInfo getBulkActivities(String user)
+ throws InterruptedException {
+ return routerWebService.getBulkActivities(
+ createHttpServletRequest(user), null, 0);
+ }
+
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
return routerWebService.getAppActivities(createHttpServletRequest(user),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
index f6dbb7f..67c9d67 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -139,6 +140,12 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
}
@Override
+ public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+ String groupBy, int activitiesCount) throws InterruptedException{
+ return new BulkActivitiesInfo();
+ }
+
+ @Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
index 55de7a4..142a651 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmi
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceOptionInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.BulkActivitiesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
@@ -167,6 +168,13 @@ public class PassThroughRESTRequestInterceptor
}
@Override
+ public BulkActivitiesInfo getBulkActivities(HttpServletRequest hsr,
+ String groupBy, int activitiesCount) throws InterruptedException {
+ return getNextInterceptor().getBulkActivities(hsr, groupBy,
+ activitiesCount);
+ }
+
+ @Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
Set<String> allocationRequestIds, String groupBy, String limit,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
index a30221d..879075e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/ResourceManagerRest.md
@@ -5746,6 +5746,140 @@ Response Body:
```
+Scheduler Bulk Activities API
+--------------------------------
+
+ The scheduler bulk activities RESTful API can fetch scheduler activities info recorded for multiple scheduling cycle. This may take time
+to return as it internally waits until a certain amount of records are generated specified by activitiesCount.
+
+### URI
+
+ * http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities
+
+### HTTP Operations Supported
+
+ * GET
+
+### Query Parameters Supported
+
+Multiple parameters can be specified for GET operations.
+
+ * activitiesCount - number of schecduling cycle to record with maximum of 500.
+ * groupBy - aggregation type of application activities, currently only support "diagnostic" with which
+ user can query aggregated activities grouped by allocation state and diagnostic.
+
+
+### Response Examples
+
+**JSON response**
+
+HTTP Request:
+
+ Accept: application/json
+ GET http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities?activitiesCount=2
+
+Response Header:
+
+ HTTP/1.1 200 OK
+ Content-Type: application/json
+ Transfer-Encoding: chunked
+ Server: Jetty(6.1.26)
+
+Response Body:
+
+Following is an output example with query parameter activitiesCount set to 2. This fetches scheduler activities info
+recorded in last two scheduling cycle.
+
+```json
+{
+ "bulkActivities": {
+ "activities": [
+ {
+ "nodeId": "127.0.0.1:1234",
+ "timestamp": 1593684431432,
+ "dateTime": "Thu Jul 02 10:07:11 UTC 2020",
+ "allocations": [
+ {
+ "partition": "",
+ "finalAllocationState": "SKIPPED",
+ "root": {
+ "name": "root",
+ "allocationState": "SKIPPED",
+ "diagnostic": "Queue does not need more resource"
+ }
+ }
+ ]
+ },
+ {
+ "nodeId": "127.0.0.2:1234",
+ "timestamp": 1593684431432,
+ "dateTime": "Thu Jul 02 10:07:11 UTC 2020",
+ "allocations": [
+ {
+ "partition": "",
+ "finalAllocationState": "SKIPPED",
+ "root": {
+ "name": "root",
+ "allocationState": "SKIPPED",
+ "diagnostic": "Queue does not need more resource"
+ }
+ }
+ ]
+ }
+ ]
+ }
+}
+```
+
+**XML response**
+
+HTTP Request:
+
+ Accept: application/xml
+ GET http://rm-http-address:port/ws/v1/cluster/scheduler/bulk-activities?activitiesCount=2
+
+Response Header:
+
+ HTTP/1.1 200 OK
+ Content-Type: application/xml; charset=utf-8
+ Transfer-Encoding: chunked
+
+Response Body:
+
+```xml
+<bulkActivities>
+ <activities>
+ <nodeId>127.0.0.1:1234</nodeId>
+ <timestamp>1593683816380</timestamp>
+ <dateTime>Thu Jul 02 09:56:56 UTC 2020</dateTime>
+ <allocations>
+ <partition/>
+ <finalAllocationState>SKIPPED</finalAllocationState>
+ <root>
+ <name>root</name>
+ <allocationState>SKIPPED</allocationState>
+ <diagnostic>Queue does not need more resource</diagnostic>
+ </root>
+ </allocations>
+ </activities>
+ <activities>
+ <nodeId>127.0.0.2:1234</nodeId>
+ <timestamp>1593683816385</timestamp>
+ <dateTime>Thu Jul 02 09:56:56 UTC 2020</dateTime>
+ <allocations>
+ <partition/>
+ <finalAllocationState>SKIPPED</finalAllocationState>
+ <root>
+ <name>root</name>
+ <allocationState>SKIPPED</allocationState>
+ <diagnostic>Queue does not need more resource</diagnostic>
+ </root>
+ </allocations>
+ </activities>
+</bulkActivities>
+```
+
+
Application Activities API
--------------------------------
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org