You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ww...@apache.org on 2019/06/13 03:01:03 UTC
[hadoop] branch trunk updated: YARN-9578. Add
limit/actions/summarize options for app activities REST API. Contributed by
Tao Yang.
This is an automated email from the ASF dual-hosted git repository.
wwei pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 970b0b0 YARN-9578. Add limit/actions/summarize options for app activities REST API. Contributed by Tao Yang.
970b0b0 is described below
commit 970b0b0c02bb8fbe8ff227c78e2332df623d1aea
Author: Weiwei Yang <ww...@apache.org>
AuthorDate: Thu Jun 13 10:44:47 2019 +0800
YARN-9578. Add limit/actions/summarize options for app activities REST API. Contributed by Tao Yang.
---
.../scheduler/activities/ActivitiesManager.java | 56 ++-
.../scheduler/activities/AppAllocation.java | 11 +-
.../server/resourcemanager/webapp/RMWSConsts.java | 13 +-
.../webapp/RMWebServiceProtocol.java | 7 +-
.../resourcemanager/webapp/RMWebServices.java | 73 +++-
.../webapp/dao/AppAllocationInfo.java | 3 +-
.../activities/TestActivitiesManager.java | 112 +++++-
.../webapp/ActivitiesTestUtils.java | 15 +
.../TestRMWebServicesSchedulerActivities.java | 407 ++++++++++++++-------
...esSchedulerActivitiesWithMultiNodesEnabled.java | 44 +--
.../webapp/DefaultRequestInterceptorREST.java | 3 +-
.../router/webapp/FederationInterceptorREST.java | 3 +-
.../server/router/webapp/RouterWebServices.java | 11 +-
.../router/webapp/BaseRouterWebServicesTest.java | 4 +-
.../router/webapp/MockRESTRequestInterceptor.java | 3 +-
.../webapp/PassThroughRESTRequestInterceptor.java | 6 +-
16 files changed, 570 insertions(+), 201 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
index 2c31472..4149ac1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/ActivitiesManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -121,7 +122,8 @@ public class ActivitiesManager extends AbstractService {
public AppActivitiesInfo getAppActivitiesInfo(ApplicationId applicationId,
Set<String> requestPriorities, Set<String> allocationRequestIds,
- RMWSConsts.ActivitiesGroupBy groupBy) {
+ RMWSConsts.ActivitiesGroupBy groupBy, int limit, boolean summarize,
+ double maxTimeInSeconds) {
RMApp app = rmContext.getRMApps().get(applicationId);
if (app != null && app.getFinalApplicationStatus()
== FinalApplicationStatus.UNDEFINED) {
@@ -140,6 +142,17 @@ public class ActivitiesManager extends AbstractService {
allocations = new ArrayList(curAllocations);
}
}
+ if (summarize && allocations != null) {
+ AppAllocation summaryAppAllocation =
+ getSummarizedAppAllocation(allocations, maxTimeInSeconds);
+ if (summaryAppAllocation != null) {
+ allocations = Lists.newArrayList(summaryAppAllocation);
+ }
+ }
+ if (allocations != null && limit > 0 && limit < allocations.size()) {
+ allocations =
+ allocations.subList(allocations.size() - limit, allocations.size());
+ }
return new AppActivitiesInfo(allocations, applicationId, groupBy);
} else {
return new AppActivitiesInfo(
@@ -148,6 +161,47 @@ public class ActivitiesManager extends AbstractService {
}
}
+ /**
+ * Get summarized app allocation from multiple allocations as follows:
+ * 1. Collect latest allocation attempts on nodes to construct an allocation
+ * summary on nodes from multiple app allocations which are recorded a few
+ * seconds before the last allocation.
+ * 2. Copy other fields from the last allocation.
+ */
+ private AppAllocation getSummarizedAppAllocation(
+ List<AppAllocation> allocations, double maxTimeInSeconds) {
+ if (allocations == null || allocations.isEmpty()) {
+ return null;
+ }
+ long startTime = allocations.get(allocations.size() - 1).getTime()
+ - (long) (maxTimeInSeconds * 1000);
+ Map<String, ActivityNode> nodeActivities = new HashMap<>();
+ for (int i = allocations.size() - 1; i >= 0; i--) {
+ AppAllocation appAllocation = allocations.get(i);
+ if (startTime > appAllocation.getTime()) {
+ break;
+ }
+ List<ActivityNode> activityNodes = appAllocation.getAllocationAttempts();
+ for (ActivityNode an : activityNodes) {
+ if (an.getNodeId() != null) {
+ nodeActivities.putIfAbsent(
+ an.getRequestPriority() + "_" + an.getAllocationRequestId() + "_"
+ + an.getNodeId(), an);
+ }
+ }
+ }
+ AppAllocation lastAppAllocation = allocations.get(allocations.size() - 1);
+ AppAllocation summarizedAppAllocation =
+ new AppAllocation(lastAppAllocation.getPriority(), null,
+ lastAppAllocation.getQueueName());
+ summarizedAppAllocation
+ .updateAppContainerStateAndTime(null, lastAppAllocation.getAppState(),
+ lastAppAllocation.getTime(), lastAppAllocation.getDiagnostic());
+ summarizedAppAllocation
+ .setAllocationAttempts(new ArrayList<>(nodeActivities.values()));
+ return summarizedAppAllocation;
+ }
+
public ActivitiesInfo getActivitiesInfo(String nodeId,
RMWSConsts.ActivitiesGroupBy groupBy) {
List<NodeAllocation> allocations;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
index 69d6ccf..e226b50 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/AppAllocation.java
@@ -84,11 +84,8 @@ public class AppAllocation {
return appState;
}
- public String getPriority() {
- if (priority == null) {
- return null;
- }
- return priority.toString();
+ public Priority getPriority() {
+ return priority;
}
public String getContainerId() {
@@ -128,4 +125,8 @@ public class AppAllocation {
.collect(Collectors.toList());
return appAllocation;
}
+
+ public void setAllocationAttempts(List<ActivityNode> allocationAttempts) {
+ this.allocationAttempts = allocationAttempts;
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
index b7a6008..f2d2b82 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWSConsts.java
@@ -71,7 +71,7 @@ public final class RMWSConsts {
/** Path for {@code RMWebServiceProtocol#getAppActivities}. */
public static final String SCHEDULER_APP_ACTIVITIES =
- "/scheduler/app-activities";
+ "/scheduler/app-activities/{appid}";
/** Path for {@code RMWebServiceProtocol#getAppStatistics}. */
public static final String APP_STATISTICS = "/appstatistics";
@@ -237,6 +237,8 @@ public final class RMWSConsts {
public static final String GROUP_BY = "groupBy";
public static final String SIGNAL = "signal";
public static final String COMMAND = "command";
+ public static final String ACTIONS = "actions";
+ public static final String SUMMARIZE = "summarize";
private RMWSConsts() {
// not called
@@ -250,4 +252,13 @@ public final class RMWSConsts {
public enum ActivitiesGroupBy {
DIAGNOSTIC
}
+
+ /**
+ * Defines the required action of app activities:
+ * REFRESH means to turn on activities recording for the required app,
+ * GET means the required app activities should be involved in response.
+ */
+ public enum AppActivitiesRequiredAction {
+ REFRESH, GET
+ }
}
\ No newline at end of file
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
index 3aa2593..a5bd93b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServiceProtocol.java
@@ -227,11 +227,16 @@ public interface RMWebServiceProtocol {
* the activities. It is a QueryParam.
* @param groupBy the groupBy type by which the activities should be
* aggregated. It is a QueryParam.
+ * @param limit set a limit of the result. It is a QueryParam.
+ * @param actions the required actions of app activities. It is a QueryParam.
+ * @param summarize whether app activities in multiple scheduling processes
+ * need to be summarized. It is a QueryParam.
* @return all the activities about a specific app for a specific time
*/
AppActivitiesInfo getAppActivities(HttpServletRequest hsr, String appId,
String time, Set<String> requestPriorities,
- Set<String> allocationRequestIds, String groupBy);
+ Set<String> allocationRequestIds, String groupBy, String limit,
+ Set<String> actions, boolean summarize);
/**
* This method retrieves all the statistics for a specific app, and it is
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
index 3f01035..762569f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java
@@ -236,6 +236,7 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
public static final String DEFAULT_START_TIME = "0";
public static final String DEFAULT_END_TIME = "-1";
public static final String DEFAULT_INCLUDE_RESOURCE = "false";
+ public static final String DEFAULT_SUMMARIZE = "false";
@VisibleForTesting
boolean isCentralizedNodeLabelConfiguration = true;
@@ -717,12 +718,16 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
MediaType.APPLICATION_XML + "; " + JettyUtils.UTF_8 })
@Override
public AppActivitiesInfo getAppActivities(@Context HttpServletRequest hsr,
- @QueryParam(RMWSConsts.APP_ID) String appId,
+ @PathParam(RMWSConsts.APPID) String appId,
@QueryParam(RMWSConsts.MAX_TIME) String time,
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds,
- @QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
+ @QueryParam(RMWSConsts.GROUP_BY) String groupBy,
+ @QueryParam(RMWSConsts.LIMIT) String limit,
+ @QueryParam(RMWSConsts.ACTIONS) Set<String> actions,
+ @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE)
+ boolean summarize) {
initForReadableEndpoints();
YarnScheduler scheduler = rm.getRMContext().getScheduler();
@@ -749,6 +754,26 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return new AppActivitiesInfo(e.getMessage(), appId);
}
+ Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions;
+ try {
+ requiredActions = parseAppActivitiesRequiredActions(actions);
+ } catch (IllegalArgumentException e) {
+ return new AppActivitiesInfo(e.getMessage(), appId);
+ }
+
+ int limitNum = -1;
+ if (limit != null) {
+ try {
+ limitNum = Integer.parseInt(limit);
+ if (limitNum <= 0) {
+ return new AppActivitiesInfo(
+ "limit must be greater than 0!", appId);
+ }
+ } catch (NumberFormatException e) {
+ return new AppActivitiesInfo("limit must be integer!", appId);
+ }
+ }
+
double maxTime = 3.0;
if (time != null) {
@@ -762,12 +787,21 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
ApplicationId applicationId;
try {
applicationId = ApplicationId.fromString(appId);
- activitiesManager.turnOnAppActivitiesRecording(applicationId, maxTime);
- AppActivitiesInfo appActivitiesInfo =
- activitiesManager.getAppActivitiesInfo(applicationId,
- requestPriorities, allocationRequestIds, activitiesGroupBy);
-
- return appActivitiesInfo;
+ if (requiredActions
+ .contains(RMWSConsts.AppActivitiesRequiredAction.REFRESH)) {
+ activitiesManager
+ .turnOnAppActivitiesRecording(applicationId, maxTime);
+ }
+ if (requiredActions
+ .contains(RMWSConsts.AppActivitiesRequiredAction.GET)) {
+ AppActivitiesInfo appActivitiesInfo = activitiesManager
+ .getAppActivitiesInfo(applicationId, requestPriorities,
+ allocationRequestIds, activitiesGroupBy, limitNum,
+ summarize, maxTime);
+ return appActivitiesInfo;
+ }
+ return new AppActivitiesInfo("Successfully notified actions: "
+ + StringUtils.join(',', actions), appId);
} catch (Exception e) {
String errMessage = "Cannot find application with given appId";
LOG.error(errMessage, e);
@@ -778,6 +812,29 @@ public class RMWebServices extends WebServices implements RMWebServiceProtocol {
return null;
}
+ private Set<RMWSConsts.AppActivitiesRequiredAction>
+ parseAppActivitiesRequiredActions(Set<String> actions) {
+ Set<RMWSConsts.AppActivitiesRequiredAction> requiredActions =
+ new HashSet<>();
+ if (actions == null || actions.isEmpty()) {
+ requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.REFRESH);
+ requiredActions.add(RMWSConsts.AppActivitiesRequiredAction.GET);
+ } else {
+ for (String action : actions) {
+ if (!EnumUtils.isValidEnum(RMWSConsts.AppActivitiesRequiredAction.class,
+ action.toUpperCase())) {
+ String errMesasge =
+ "Got invalid action: " + action + ", valid actions: " + Arrays
+ .asList(RMWSConsts.AppActivitiesRequiredAction.values());
+ throw new IllegalArgumentException(errMesasge);
+ }
+ requiredActions.add(RMWSConsts.AppActivitiesRequiredAction
+ .valueOf(action.toUpperCase()));
+ }
+ }
+ return requiredActions;
+ }
+
private RMWSConsts.ActivitiesGroupBy parseActivitiesGroupBy(String groupBy) {
if (groupBy != null) {
if (!EnumUtils.isValidEnum(RMWSConsts.ActivitiesGroupBy.class,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
index 6b0d86b..6ae1f9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppAllocationInfo.java
@@ -54,7 +54,8 @@ public class AppAllocationInfo {
this.requestAllocation = new ArrayList<>();
this.nodeId = allocation.getNodeId();
this.queueName = allocation.getQueueName();
- this.appPriority = allocation.getPriority();
+ this.appPriority = allocation.getPriority() == null ?
+ null : allocation.getPriority().toString();
this.timestamp = allocation.getTime();
this.dateTime = new Date(allocation.getTime()).toString();
this.allocationState = allocation.getAppState().name();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
index 495c7e2..2bf6b23 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/activities/TestActivitiesManager.java
@@ -29,6 +29,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -286,18 +288,124 @@ public class TestActivitiesManager {
ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
}
AppActivitiesInfo appActivitiesInfo = newActivitiesManager
- .getAppActivitiesInfo(app.getApplicationId(), null, null, null);
+ .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
+ false, 3);
Assert.assertEquals(numActivities,
appActivitiesInfo.getAllocations().size());
// sleep until all app activities expired
Thread.sleep(cleanupIntervalMs + appActivitiesTTL);
// there should be no remaining app activities
appActivitiesInfo = newActivitiesManager
- .getAppActivitiesInfo(app.getApplicationId(), null, null, null);
+ .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
+ false, 3);
Assert.assertEquals(0,
appActivitiesInfo.getAllocations().size());
}
+ @Test (timeout = 30000)
+ public void testAppActivitiesPerformance() {
+ // start recording activities for first app
+ SchedulerApplicationAttempt app = apps.get(0);
+ FiCaSchedulerNode node = (FiCaSchedulerNode) nodes.get(0);
+ activitiesManager.turnOnAppActivitiesRecording(app.getApplicationId(), 100);
+ int numActivities = 100;
+ int numNodes = 10000;
+ int testingTimes = 10;
+ for (int ano = 0; ano < numActivities; ano++) {
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
+ SystemClock.getInstance().getTime(), app);
+ for (int i = 0; i < numNodes; i++) {
+ NodeId nodeId = NodeId.newInstance("host" + i, 0);
+ activitiesManager
+ .addSchedulingActivityForApp(app.getApplicationId(), null, "0",
+ ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.FAIL_TO_ALLOCATE, "container",
+ nodeId, "0");
+ }
+ ActivitiesLogger.APP
+ .finishAllocatedAppAllocationRecording(activitiesManager,
+ app.getApplicationId(), null, ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.SKIPPED_ALL_PRIORITIES);
+ }
+
+ // It often take a longer time for the first query, ignore this distraction
+ activitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
+ true, 100);
+
+ // Test getting normal app activities
+ Supplier<Void> normalSupplier = () -> {
+ AppActivitiesInfo appActivitiesInfo = activitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null, null, -1,
+ false, 100);
+ Assert.assertEquals(numActivities,
+ appActivitiesInfo.getAllocations().size());
+ Assert.assertEquals(1,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .size());
+ Assert.assertEquals(numNodes,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .get(0).getAllocationAttempt().size());
+ return null;
+ };
+ testManyTimes("Getting normal app activities", normalSupplier,
+ testingTimes);
+
+ // Test getting aggregated app activities
+ Supplier<Void> aggregatedSupplier = () -> {
+ AppActivitiesInfo appActivitiesInfo = activitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null,
+ RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, false, 100);
+ Assert.assertEquals(numActivities,
+ appActivitiesInfo.getAllocations().size());
+ Assert.assertEquals(1,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .size());
+ Assert.assertEquals(1,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .get(0).getAllocationAttempt().size());
+ Assert.assertEquals(numNodes,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .get(0).getAllocationAttempt().get(0).getNodeIds().size());
+ return null;
+ };
+ testManyTimes("Getting aggregated app activities", aggregatedSupplier,
+ testingTimes);
+
+ // Test getting summarized app activities
+ Supplier<Void> summarizedSupplier = () -> {
+ AppActivitiesInfo appActivitiesInfo = activitiesManager
+ .getAppActivitiesInfo(app.getApplicationId(), null, null,
+ RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC, -1, true, 100);
+ Assert.assertEquals(1, appActivitiesInfo.getAllocations().size());
+ Assert.assertEquals(1,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .size());
+ Assert.assertEquals(1,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .get(0).getAllocationAttempt().size());
+ Assert.assertEquals(numNodes,
+ appActivitiesInfo.getAllocations().get(0).getRequestAllocation()
+ .get(0).getAllocationAttempt().get(0).getNodeIds().size());
+ return null;
+ };
+ testManyTimes("Getting summarized app activities", summarizedSupplier,
+ testingTimes);
+ }
+
+ private void testManyTimes(String testingName,
+ Supplier<Void> supplier, int testingTimes) {
+ long totalTime = 0;
+ for (int i = 0; i < testingTimes; i++) {
+ long startTime = System.currentTimeMillis();
+ supplier.get();
+ totalTime += System.currentTimeMillis() - startTime;
+ }
+ System.out.println("#" + testingName + ", testing times : " + testingTimes
+ + ", total cost time : " + totalTime + " ms, average cost time : "
+ + (float) totalTime / testingTimes + " ms.");
+ }
+
/**
* Testing activities manager which can record all history information about
* node allocations.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
index da89862..666e5fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/ActivitiesTestUtils.java
@@ -41,6 +41,8 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.function.Predicate;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static org.junit.Assert.assertEquals;
@@ -209,4 +211,17 @@ public final class ActivitiesTestUtils {
response.getType().toString());
return response.getEntity(JSONObject.class);
}
+
+ /**
+ * Convert format using {name} (HTTP base) into %s (Java based).
+ * @param format Initial format using {}.
+ * @param args Arguments for the format.
+ * @return New format using %s.
+ */
+ public static String format(String format, Object... args) {
+ Pattern p = Pattern.compile("\\{.*?}");
+ Matcher m = p.matcher(format);
+ String newFormat = m.replaceAll("%s");
+ return String.format(newFormat, args);
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
index 1e08f05..8bdecb7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivities.java
@@ -437,25 +437,17 @@ public class TestRMWebServicesSchedulerActivities
RMApp app1 = rm.submitApp(10, "app1", "user1", null, "b1");
//Get JSON
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ ActivitiesTestUtils.requestWebResource(r, params);
+
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
//Check app activities
verifyNumberOfAllocations(json, 1);
@@ -502,25 +494,17 @@ public class TestRMWebServicesSchedulerActivities
10)), null);
//Get JSON
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ ActivitiesTestUtils.requestWebResource(r, params);
+
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 10);
@@ -555,26 +539,17 @@ public class TestRMWebServicesSchedulerActivities
10)), null);
//Get JSON
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ ActivitiesTestUtils.requestWebResource(r, params);
+
nm.nodeHeartbeat(true);
Thread.sleep(5000);
//Get JSON
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
@@ -590,24 +565,14 @@ public class TestRMWebServicesSchedulerActivities
RMApp app1 = rm.submitApp(1024, "app1", "user1", null, "b1");
//Get JSON
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ ActivitiesTestUtils.requestWebResource(r, params);
//Get JSON
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 0);
} finally {
rm.stop();
@@ -639,49 +604,23 @@ public class TestRMWebServicesSchedulerActivities
10)), null);
// Reserve new container
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ ActivitiesTestUtils.requestWebResource(r, params);
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
// Do a node heartbeat again without releasing container from app2
- r = resource();
- params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 2);
// Finish application 2
@@ -693,26 +632,10 @@ public class TestRMWebServicesSchedulerActivities
RMContainerEventType.FINISHED);
// Do a node heartbeat again
- r = resource();
- params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
nm2.nodeHeartbeat(true);
Thread.sleep(1000);
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 3);
} finally {
rm.stop();
@@ -847,15 +770,11 @@ public class TestRMWebServicesSchedulerActivities
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getString("diagnostic"));
@@ -867,14 +786,7 @@ public class TestRMWebServicesSchedulerActivities
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
- response =
- r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
JSONObject requestAllocationObj =
@@ -904,15 +816,11 @@ public class TestRMWebServicesSchedulerActivities
RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getString("diagnostic"));
@@ -930,14 +838,7 @@ public class TestRMWebServicesSchedulerActivities
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
- response =
- r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
JSONObject allocationObj = json.getJSONObject("allocations");
JSONObject requestAllocationObj =
@@ -967,9 +868,9 @@ public class TestRMWebServicesSchedulerActivities
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
- .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add("appId", app1.getApplicationId().toString());
JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display",
json.getString("diagnostic"));
@@ -1064,4 +965,228 @@ public class TestRMWebServicesSchedulerActivities
rm.stop();
}
}
+
+ @Test(timeout = 30000)
+ public void testAppLimit() throws Exception {
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * 1024);
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * 1024);
+ try {
+ RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("waiting for display",
+ json.getString("diagnostic"));
+
+ // am1 asks for 1 * 5GB container
+ am1.allocate("*", 5120, 1, new ArrayList<>());
+ // trigger scheduling triple, there will be 3 app activities in cache
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+ // query all app activities without limit
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 3);
+
+ // query all app activities with limit > 3
+ params.putSingle(RMWSConsts.LIMIT, "10");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 3);
+
+ // query app activities with limit = 2
+ params.putSingle(RMWSConsts.LIMIT, "2");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 2);
+
+ // query app activities with limit = 1
+ params.putSingle(RMWSConsts.LIMIT, "1");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 1);
+
+ // query all app activities with invalid limit
+ params.putSingle(RMWSConsts.LIMIT, "STRING");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("limit must be integer!", json.getString("diagnostic"));
+
+ // query all app activities with limit = 0
+ params.putSingle(RMWSConsts.LIMIT, "0");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("limit must be greater than 0!",
+ json.getString("diagnostic"));
+
+ // query all app activities with limit < 0
+ params.putSingle(RMWSConsts.LIMIT, "-3");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("limit must be greater than 0!",
+ json.getString("diagnostic"));
+ } finally {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout = 30000)
+ public void testAppActions() throws Exception {
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
+ try {
+ RMApp app1 = rm.submitApp(512, "app1", "user1", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ // am1 asks for 10 * 512MB container
+ am1.allocate("*", 512, 10, new ArrayList<>());
+
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ params.add("maxTime", 1); //only last for 1 second
+
+ // testing invalid action
+ params.add(RMWSConsts.ACTIONS, "get");
+ params.add(RMWSConsts.ACTIONS, "invalid-action");
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertTrue(json.getString("diagnostic").startsWith("Got invalid action"));
+
+ /*
+ * testing get action
+ */
+ params.putSingle(RMWSConsts.ACTIONS, "get");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("waiting for display", json.getString("diagnostic"));
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+ // app activities won't be recorded
+ params.putSingle(RMWSConsts.ACTIONS, "get");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("waiting for display", json.getString("diagnostic"));
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+ /*
+ * testing update action
+ */
+ params.putSingle(RMWSConsts.ACTIONS, "refresh");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("Successfully notified actions: refresh",
+ json.getString("diagnostic"));
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ Thread.sleep(1000);
+
+ // app activities should be recorded
+ params.putSingle(RMWSConsts.ACTIONS, "get");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 1);
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ Thread.sleep(1000);
+
+ /*
+ * testing update and get actions
+ */
+ params.remove(RMWSConsts.ACTIONS);
+ params.add(RMWSConsts.ACTIONS, "refresh");
+ params.add(RMWSConsts.ACTIONS, "get");
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 1);
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ Thread.sleep(1000);
+
+ // more app activities should be recorded
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 2);
+
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+ Thread.sleep(1000);
+
+ // more app activities should be recorded
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+ verifyNumberOfAllocations(json, 3);
+ } finally {
+ rm.stop();
+ }
+ }
+
+ @Test(timeout=30000)
+ public void testAppSummary() throws Exception {
+ rm.start();
+ CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+ MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * 1024);
+ MockNM nm2 = rm.registerNode("127.0.0.2:1234", 4 * 1024);
+ MockNM nm3 = rm.registerNode("127.0.0.3:1234", 4 * 1024);
+
+ try {
+ RMApp app1 = rm.submitApp(5120, "app1", "user1", null, "b1");
+
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
+ MultivaluedMapImpl params = new MultivaluedMapImpl();
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
+ assertEquals("waiting for display",
+ json.getString("diagnostic"));
+
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
+ // am1 asks for 1 * 5GB container
+ am1.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(0), "*",
+ Resources.createResource(5 * 1024), 1)), null);
+ // trigger scheduling
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm2.getNodeId())));
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm3.getNodeId())));
+ cs.handle(new NodeUpdateSchedulerEvent(
+ rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
+
+ params.add(RMWSConsts.SUMMARIZE, "true");
+ params.add(RMWSConsts.GROUP_BY, RMWSConsts.ActivitiesGroupBy.DIAGNOSTIC);
+ json = ActivitiesTestUtils.requestWebResource(r, params);
+
+ // verify that response contains an allocation summary for all nodes
+ verifyNumberOfAllocations(json, 1);
+ JSONObject allocation = json.getJSONObject("allocations");
+ JSONObject reqestAllocation =
+ allocation.getJSONObject("requestAllocation");
+ JSONArray attempts = reqestAllocation.getJSONArray("allocationAttempt");
+ assertEquals(2, attempts.length());
+ for (int i = 0; i < attempts.length(); i++) {
+ JSONObject attempt = attempts.getJSONObject(i);
+ if (attempt.getString("allocationState").equals("SKIPPED")) {
+ JSONArray nodeIds = attempt.optJSONArray("nodeIds");
+ assertEquals(2, nodeIds.length());
+ } else if (attempt.getString("allocationState").equals("RESERVED")) {
+ assertEquals(nm1.getNodeId().toString(),
+ attempt.getString("nodeIds"));
+ }
+ }
+ } finally {
+ rm.stop();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
index 8383a0d..8998221 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled.java
@@ -249,15 +249,11 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
1)), null);
//Trigger recording for this app
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
- ClientResponse response = r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display", json.getString("diagnostic"));
//Trigger scheduling for this app
@@ -267,12 +263,7 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
//Check app activities, it should contain one allocation and
// final allocation state is ALLOCATED
- response = r.path("ws").path("v1").path("cluster")
- .path("scheduler/app-activities").queryParams(params)
- .accept(MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 1);
@@ -382,16 +373,11 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
RMApp app1 = rm.submitApp(3072, "app1", "user1", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
- WebResource r = resource();
+ WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
-
- ClientResponse response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- JSONObject json = response.getEntity(JSONObject.class);
+ JSONObject json = ActivitiesTestUtils.requestWebResource(r, params);
assertEquals("waiting for display", json.getString("diagnostic"));
//Request two containers with different priority for am1
@@ -409,14 +395,8 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
cs.handle(new NodeUpdateSchedulerEvent(
rm.getRMContext().getRMNodes().get(nm1.getNodeId())));
- response = r.path("ws").path("v1").path("cluster").path(
- "scheduler/app-activities").queryParams(params).accept(
- MediaType.APPLICATION_JSON).get(ClientResponse.class);
- assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
- response.getType().toString());
- json = response.getEntity(JSONObject.class);
-
//Check app activities
+ json = ActivitiesTestUtils.requestWebResource(r, params);
verifyNumberOfAllocations(json, 2);
JSONArray allocationArray = json.getJSONArray("allocations");
//Check first activity is for second allocation with RESERVED state
@@ -539,9 +519,9 @@ public class TestRMWebServicesSchedulerActivitiesWithMultiNodesEnabled
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
WebResource r = resource().path(RMWSConsts.RM_WEB_SERVICE_PATH)
- .path(RMWSConsts.SCHEDULER_APP_ACTIVITIES);
+ .path(ActivitiesTestUtils.format(RMWSConsts.SCHEDULER_APP_ACTIVITIES,
+ app1.getApplicationId().toString()));
MultivaluedMapImpl params = new MultivaluedMapImpl();
- params.add(RMWSConsts.APP_ID, app1.getApplicationId().toString());
/*
* test non-exist groupBy
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
index 7e6f306..bf0dee6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
@@ -192,7 +192,8 @@ public class DefaultRequestInterceptorREST
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
- Set<String> allocationRequestIds, String groupBy) {
+ Set<String> allocationRequestIds, String groupBy, String limit,
+ Set<String> actions, boolean summarize) {
// time and appId are specified inside hsr
return RouterWebServiceUtil.genericForward(webAppAddress, hsr,
AppActivitiesInfo.class, HTTPMethods.GET,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 1c8b7a8..1ed5f59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -1146,7 +1146,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
- Set<String> allocationRequestIds, String groupBy) {
+ Set<String> allocationRequestIds, String groupBy, String limit,
+ Set<String> actions, boolean summarize) {
throw new NotImplementedException("Code is not implemented");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
index 9327c6f..9327547 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServices.java
@@ -95,6 +95,8 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebServices.DEFAULT_SUMMARIZE;
+
/**
* RouterWebServices is a service that runs on each router that can be used to
* intercept and inspect {@link RMWebServiceProtocol} messages from client to
@@ -465,11 +467,16 @@ public class RouterWebServices implements RMWebServiceProtocol {
@QueryParam(RMWSConsts.REQUEST_PRIORITIES) Set<String> requestPriorities,
@QueryParam(RMWSConsts.ALLOCATION_REQUEST_IDS)
Set<String> allocationRequestIds,
- @QueryParam(RMWSConsts.GROUP_BY) String groupBy) {
+ @QueryParam(RMWSConsts.GROUP_BY) String groupBy,
+ @QueryParam(RMWSConsts.LIMIT) String limit,
+ @QueryParam(RMWSConsts.ACTIONS) Set<String> actions,
+ @QueryParam(RMWSConsts.SUMMARIZE) @DefaultValue(DEFAULT_SUMMARIZE)
+ boolean summarize) {
init();
RequestInterceptorChainWrapper pipeline = getInterceptorChain(hsr);
return pipeline.getRootInterceptor().getAppActivities(hsr, appId, time,
- requestPriorities, allocationRequestIds, groupBy);
+ requestPriorities, allocationRequestIds, groupBy, limit, actions,
+ summarize);
}
@GET
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index 535c579..78aab5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -180,8 +180,8 @@ public abstract class BaseRouterWebServicesTest {
protected AppActivitiesInfo getAppActivities(String user)
throws IOException, InterruptedException {
- return routerWebService.getAppActivities(
- createHttpServletRequest(user), null, null, null, null, null);
+ return routerWebService.getAppActivities(createHttpServletRequest(user),
+ null, null, null, null, null, null, null, false);
}
protected ApplicationStatisticsInfo getAppStatistics(String user)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
index f93b397..50200ed 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockRESTRequestInterceptor.java
@@ -141,7 +141,8 @@ public class MockRESTRequestInterceptor extends AbstractRESTRequestInterceptor {
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
- Set<String> allocationRequestIds, String groupBy) {
+ Set<String> allocationRequestIds, String groupBy, String limit,
+ Set<String> actions, boolean summarize) {
return new AppActivitiesInfo();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
index 126610c..eb7222f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/PassThroughRESTRequestInterceptor.java
@@ -169,9 +169,11 @@ public class PassThroughRESTRequestInterceptor
@Override
public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
String appId, String time, Set<String> requestPriorities,
- Set<String> allocationRequestIds, String groupBy) {
+ Set<String> allocationRequestIds, String groupBy, String limit,
+ Set<String> actions, boolean summarize) {
return getNextInterceptor().getAppActivities(hsr, appId, time,
- requestPriorities, allocationRequestIds, groupBy);
+ requestPriorities, allocationRequestIds, groupBy, limit,
+ actions, summarize);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org