You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/29 21:53:58 UTC
hadoop git commit: YARN-7010. Federation: routing REST invocations
transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni
Matteo Fumarola via curino)
Repository: hadoop
Updated Branches:
refs/heads/trunk 63fc1b0b6 -> cc8893edc
YARN-7010. Federation: routing REST invocations transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni Matteo Fumarola via curino)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc8893ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc8893ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc8893ed
Branch: refs/heads/trunk
Commit: cc8893edc0b7960e958723c81062986c12f06ade
Parents: 63fc1b0
Author: Carlo Curino <cu...@apache.org>
Authored: Tue Aug 29 14:53:09 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 29 14:53:09 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 9 +
.../yarn/conf/TestYarnConfigurationFields.java | 2 +
.../server/uam/UnmanagedApplicationManager.java | 2 +-
.../resourcemanager/webapp/dao/AppInfo.java | 184 +++++++----
.../resourcemanager/webapp/dao/AppsInfo.java | 4 +
.../yarn/server/router/RouterMetrics.java | 33 ++
.../webapp/FederationInterceptorREST.java | 118 ++++++-
.../router/webapp/RouterWebServiceUtil.java | 109 ++++++-
.../yarn/server/router/TestRouterMetrics.java | 50 +++
.../MockDefaultRequestInterceptorREST.java | 49 ++-
.../webapp/TestFederationInterceptorREST.java | 17 +
.../TestFederationInterceptorRESTRetry.java | 45 +++
.../router/webapp/TestRouterWebServiceUtil.java | 311 +++++++++++++++++++
13 files changed, 855 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 16bd73a..67dfeeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2753,6 +2753,15 @@ public class YarnConfiguration extends Configuration {
"org.apache.hadoop.yarn.server.router.webapp."
+ "DefaultRequestInterceptorREST";
+ /**
+ * The interceptor class used in FederationInterceptorREST should return
+ * partial AppReports.
+ */
+ public static final String ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
+ ROUTER_WEBAPP_PREFIX + "partial-result.enabled";
+ public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
+ false;
+
////////////////////////////////
// Other Configs
////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 153a35a..d97c6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -159,6 +159,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
configurationPrefixToSkipCompare
.add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
+ configurationPrefixToSkipCompare
+ .add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
// Set by container-executor.cfg
configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 60a9a27..6531a75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -83,7 +83,7 @@ public class UnmanagedApplicationManager {
private static final Logger LOG =
LoggerFactory.getLogger(UnmanagedApplicationManager.class);
private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
- private static final String APP_NAME = "UnmanagedAM";
+ public static final String APP_NAME = "UnmanagedAM";
private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
index f11939a..9fb8fb5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields.DeSel
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
@XmlRootElement(name = "app")
@@ -71,9 +72,9 @@ public class AppInfo {
// these are ok for any user to see
protected String id;
protected String user;
- protected String name;
+ private String name;
protected String queue;
- protected YarnApplicationState state;
+ private YarnApplicationState state;
protected FinalApplicationStatus finalStatus;
protected float progress;
protected String trackingUI;
@@ -91,21 +92,21 @@ public class AppInfo {
protected String amContainerLogs;
protected String amHostHttpAddress;
private String amRPCAddress;
- protected long allocatedMB;
- protected long allocatedVCores;
- protected long reservedMB;
- protected long reservedVCores;
- protected int runningContainers;
- protected long memorySeconds;
- protected long vcoreSeconds;
+ private long allocatedMB;
+ private long allocatedVCores;
+ private long reservedMB;
+ private long reservedVCores;
+ private int runningContainers;
+ private long memorySeconds;
+ private long vcoreSeconds;
protected float queueUsagePercentage;
protected float clusterUsagePercentage;
// preemption info fields
- protected long preemptedResourceMB;
- protected long preemptedResourceVCores;
- protected int numNonAMContainerPreempted;
- protected int numAMContainerPreempted;
+ private long preemptedResourceMB;
+ private long preemptedResourceVCores;
+ private int numNonAMContainerPreempted;
+ private int numAMContainerPreempted;
private long preemptedMemorySeconds;
private long preemptedVcoreSeconds;
@@ -142,12 +143,11 @@ public class AppInfo {
|| YarnApplicationState.NEW_SAVING == this.state
|| YarnApplicationState.SUBMITTED == this.state
|| YarnApplicationState.ACCEPTED == this.state;
- this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
- .getFinishTime() == 0 ? "ApplicationMaster" : "History");
+ this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED"
+ : (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
if (!trackingUrlIsNotReady) {
this.trackingUrl =
- WebAppUtils.getURLWithScheme(schemePrefix,
- trackingUrl);
+ WebAppUtils.getURLWithScheme(schemePrefix, trackingUrl);
this.trackingUrlPretty = this.trackingUrl;
} else {
this.trackingUrlPretty = "UNASSIGNED";
@@ -162,15 +162,15 @@ public class AppInfo {
this.priority = 0;
if (app.getApplicationPriority() != null) {
- this.priority = app.getApplicationPriority()
- .getPriority();
+ this.priority = app.getApplicationPriority().getPriority();
}
this.progress = app.getProgress() * 100;
this.diagnostics = app.getDiagnostics().toString();
if (diagnostics == null || diagnostics.isEmpty()) {
this.diagnostics = "";
}
- if (app.getApplicationTags() != null && !app.getApplicationTags().isEmpty()) {
+ if (app.getApplicationTags() != null
+ && !app.getApplicationTags().isEmpty()) {
this.applicationTags = Joiner.on(',').join(app.getApplicationTags());
}
this.finalStatus = app.getFinalApplicationStatus();
@@ -178,8 +178,8 @@ public class AppInfo {
if (hasAccess) {
this.startedTime = app.getStartTime();
this.finishedTime = app.getFinishTime();
- this.elapsedTime = Times.elapsed(app.getStartTime(),
- app.getFinishTime());
+ this.elapsedTime =
+ Times.elapsed(app.getStartTime(), app.getFinishTime());
this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
RMAppAttempt attempt = app.getCurrentAppAttempt();
if (attempt != null) {
@@ -194,8 +194,8 @@ public class AppInfo {
this.amRPCAddress = getAmRPCAddressFromRMAppAttempt(attempt);
- ApplicationResourceUsageReport resourceReport = attempt
- .getApplicationResourceUsageReport();
+ ApplicationResourceUsageReport resourceReport =
+ attempt.getApplicationResourceUsageReport();
if (resourceReport != null) {
Resource usedResources = resourceReport.getUsedResources();
Resource reservedResources = resourceReport.getReservedResources();
@@ -208,10 +208,11 @@ public class AppInfo {
clusterUsagePercentage = resourceReport.getClusterUsagePercentage();
}
- /* When the deSelects parameter contains "resourceRequests",
- it skips returning massive ResourceRequest objects and vice versa.
- Default behavior is no skipping. (YARN-6280)
- */
+ /*
+ * When the deSelects parameter contains "resourceRequests", it skips
+ * returning massive ResourceRequest objects and vice versa. Default
+ * behavior is no skipping. (YARN-6280)
+ */
if (!deSelects.contains(DeSelectType.RESOURCE_REQUESTS)) {
List<ResourceRequest> resourceRequestsRaw = rm.getRMContext()
.getScheduler().getPendingResourceRequestsForAttempt(
@@ -228,12 +229,9 @@ public class AppInfo {
// copy preemption info fields
RMAppMetrics appMetrics = app.getRMAppMetrics();
- numAMContainerPreempted =
- appMetrics.getNumAMContainersPreempted();
- preemptedResourceMB =
- appMetrics.getResourcePreempted().getMemorySize();
- numNonAMContainerPreempted =
- appMetrics.getNumNonAMContainersPreempted();
+ numAMContainerPreempted = appMetrics.getNumAMContainersPreempted();
+ preemptedResourceMB = appMetrics.getResourcePreempted().getMemorySize();
+ numNonAMContainerPreempted = appMetrics.getNumNonAMContainersPreempted();
preemptedResourceVCores =
appMetrics.getResourcePreempted().getVirtualCores();
memorySeconds = appMetrics.getMemorySeconds();
@@ -242,8 +240,7 @@ public class AppInfo {
preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
ApplicationSubmissionContext appSubmissionContext =
app.getApplicationSubmissionContext();
- unmanagedApplication =
- appSubmissionContext.getUnmanagedAM();
+ unmanagedApplication = appSubmissionContext.getUnmanagedAM();
appNodeLabelExpression =
app.getApplicationSubmissionContext().getNodeLabelExpression();
amNodeLabelExpression = (unmanagedApplication) ? null
@@ -286,6 +283,7 @@ public class AppInfo {
timeouts.add(timeout);
}
}
+
}
}
@@ -396,19 +394,19 @@ public class AppInfo {
public String getApplicationTags() {
return this.applicationTags;
}
-
+
public int getRunningContainers() {
return this.runningContainers;
}
-
+
public long getAllocatedMB() {
return this.allocatedMB;
}
-
+
public long getAllocatedVCores() {
return this.allocatedVCores;
}
-
+
public long getReservedMB() {
return this.reservedMB;
}
@@ -417,22 +415,6 @@ public class AppInfo {
return this.reservedVCores;
}
- public long getPreemptedMB() {
- return preemptedResourceMB;
- }
-
- public long getPreemptedVCores() {
- return preemptedResourceVCores;
- }
-
- public int getNumNonAMContainersPreempted() {
- return numNonAMContainerPreempted;
- }
-
- public int getNumAMContainersPreempted() {
- return numAMContainerPreempted;
- }
-
public long getMemorySeconds() {
return memorySeconds;
}
@@ -448,10 +430,15 @@ public class AppInfo {
public long getPreemptedVcoreSeconds() {
return preemptedVcoreSeconds;
}
+
public List<ResourceRequestInfo> getResourceRequests() {
return this.resourceRequests;
}
+ public void setResourceRequests(List<ResourceRequestInfo> resourceRequests) {
+ this.resourceRequests = resourceRequests;
+ }
+
public LogAggregationStatus getLogAggregationStatus() {
return this.logAggregationStatus;
}
@@ -475,4 +462,89 @@ public class AppInfo {
public ResourcesInfo getResourceInfo() {
return resourceInfo;
}
+
+ public long getPreemptedResourceMB() {
+ return preemptedResourceMB;
+ }
+
+ public void setPreemptedResourceMB(long preemptedResourceMB) {
+ this.preemptedResourceMB = preemptedResourceMB;
+ }
+
+ public long getPreemptedResourceVCores() {
+ return preemptedResourceVCores;
+ }
+
+ public void setPreemptedResourceVCores(long preemptedResourceVCores) {
+ this.preemptedResourceVCores = preemptedResourceVCores;
+ }
+
+ public int getNumNonAMContainerPreempted() {
+ return numNonAMContainerPreempted;
+ }
+
+ public void setNumNonAMContainerPreempted(int numNonAMContainerPreempted) {
+ this.numNonAMContainerPreempted = numNonAMContainerPreempted;
+ }
+
+ public int getNumAMContainerPreempted() {
+ return numAMContainerPreempted;
+ }
+
+ public void setNumAMContainerPreempted(int numAMContainerPreempted) {
+ this.numAMContainerPreempted = numAMContainerPreempted;
+ }
+
+ public void setPreemptedMemorySeconds(long preemptedMemorySeconds) {
+ this.preemptedMemorySeconds = preemptedMemorySeconds;
+ }
+
+ public void setPreemptedVcoreSeconds(long preemptedVcoreSeconds) {
+ this.preemptedVcoreSeconds = preemptedVcoreSeconds;
+ }
+
+ public void setAllocatedMB(long allocatedMB) {
+ this.allocatedMB = allocatedMB;
+ }
+
+ public void setAllocatedVCores(long allocatedVCores) {
+ this.allocatedVCores = allocatedVCores;
+ }
+
+ public void setReservedMB(long reservedMB) {
+ this.reservedMB = reservedMB;
+ }
+
+ public void setReservedVCores(long reservedVCores) {
+ this.reservedVCores = reservedVCores;
+ }
+
+ public void setRunningContainers(int runningContainers) {
+ this.runningContainers = runningContainers;
+ }
+
+ public void setMemorySeconds(long memorySeconds) {
+ this.memorySeconds = memorySeconds;
+ }
+
+ public void setVcoreSeconds(long vcoreSeconds) {
+ this.vcoreSeconds = vcoreSeconds;
+ }
+
+ public void setAppId(String appId) {
+ this.id = appId;
+ }
+
+ @VisibleForTesting
+ public void setAMHostHttpAddress(String amHost) {
+ this.amHostHttpAddress = amHost;
+ }
+
+ public void setState(YarnApplicationState state) {
+ this.state = state;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
index 84f68f1..39837b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
@@ -40,4 +40,8 @@ public class AppsInfo {
return app;
}
+ public void addAll(ArrayList<AppInfo> appsInfo) {
+ app.addAll(appsInfo);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 42361a3..6d75471 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -49,6 +49,8 @@ public final class RouterMetrics {
private MutableGaugeInt numAppsFailedKilled;
@Metric("# of application reports failed to be retrieved")
private MutableGaugeInt numAppsFailedRetrieved;
+ @Metric("# of multiple applications reports failed to be retrieved")
+ private MutableGaugeInt numMultipleAppsFailedRetrieved;
// Aggregate metrics are shared, and don't have to be looked up per call
@Metric("Total number of successful Submitted apps and latency(ms)")
@@ -59,6 +61,9 @@ public final class RouterMetrics {
private MutableRate totalSucceededAppsCreated;
@Metric("Total number of successful Retrieved app reports and latency(ms)")
private MutableRate totalSucceededAppsRetrieved;
+ @Metric("Total number of successful Retrieved multiple apps reports and "
+ + "latency(ms)")
+ private MutableRate totalSucceededMultipleAppsRetrieved;
/**
* Provide quantile counters for all latencies.
@@ -67,6 +72,7 @@ public final class RouterMetrics {
private MutableQuantiles getNewApplicationLatency;
private MutableQuantiles killApplicationLatency;
private MutableQuantiles getApplicationReportLatency;
+ private MutableQuantiles getApplicationsReportLatency;
private static volatile RouterMetrics INSTANCE = null;
private static MetricsRegistry registry;
@@ -83,6 +89,9 @@ public final class RouterMetrics {
getApplicationReportLatency =
registry.newQuantiles("getApplicationReportLatency",
"latency of get application report", "ops", "latency", 10);
+ getApplicationsReportLatency =
+ registry.newQuantiles("getApplicationsReportLatency",
+ "latency of get applications report", "ops", "latency", 10);
}
public static RouterMetrics getMetrics() {
@@ -125,6 +134,11 @@ public final class RouterMetrics {
}
@VisibleForTesting
+ public long getNumSucceededMultipleAppsRetrieved() {
+ return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
+ }
+
+ @VisibleForTesting
public double getLatencySucceededAppsCreated() {
return totalSucceededAppsCreated.lastStat().mean();
}
@@ -145,6 +159,11 @@ public final class RouterMetrics {
}
@VisibleForTesting
+ public double getLatencySucceededMultipleGetAppReport() {
+ return totalSucceededMultipleAppsRetrieved.lastStat().mean();
+ }
+
+ @VisibleForTesting
public int getAppsFailedCreated() {
return numAppsFailedCreated.value();
}
@@ -164,6 +183,11 @@ public final class RouterMetrics {
return numAppsFailedRetrieved.value();
}
+ @VisibleForTesting
+ public int getMultipleAppsFailedRetrieved() {
+ return numMultipleAppsFailedRetrieved.value();
+ }
+
public void succeededAppsCreated(long duration) {
totalSucceededAppsCreated.add(duration);
getNewApplicationLatency.add(duration);
@@ -184,6 +208,11 @@ public final class RouterMetrics {
getApplicationReportLatency.add(duration);
}
+ public void succeededMultipleAppsRetrieved(long duration) {
+ totalSucceededMultipleAppsRetrieved.add(duration);
+ getApplicationsReportLatency.add(duration);
+ }
+
public void incrAppsFailedCreated() {
numAppsFailedCreated.incr();
}
@@ -200,4 +229,8 @@ public final class RouterMetrics {
numAppsFailedRetrieved.incr();
}
+ public void incrMultipleAppsFailedRetrieved() {
+ numMultipleAppsFailedRetrieved.incr();
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 4c7d4b1..3a91e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -25,6 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -102,9 +107,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private RouterPolicyFacade policyFacade;
private RouterMetrics routerMetrics;
private final Clock clock = new MonotonicClock();
+ private boolean returnPartialReport;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
+ /**
+ * Thread pool used for asynchronous operations.
+ */
+ private ExecutorService threadpool;
+
@Override
public void init(String user) {
federationFacade = FederationStateStoreFacade.getInstance();
@@ -125,6 +136,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
routerMetrics = RouterMetrics.getMetrics();
+ threadpool = Executors.newCachedThreadPool();
+
+ returnPartialReport =
+ conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
+ YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
}
private SubClusterId getRandomActiveSubCluster(
@@ -586,6 +602,99 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
return response;
}
+ /**
+ * The Yarn Router will forward the request to all the Yarn RMs in parallel,
+ * after that it will group all the ApplicationReports by the ApplicationId.
+ * <p>
+ * Possible failure:
+ * <p>
+ * Client: identical behavior as {@code RMWebServices}.
+ * <p>
+ * Router: the Client will timeout and resubmit the request.
+ * <p>
+ * ResourceManager: the Router calls each Yarn RM in parallel by using one
+ * thread for each Yarn RM. In case a Yarn RM fails, a single call will
+ * timeout. However the Router will merge the ApplicationReports it got, and
+ * provides a partial list to the client.
+ * <p>
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
+ @Override
+ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+ Set<String> statesQuery, String finalStatusQuery, String userQuery,
+ String queueQuery, String count, String startedBegin, String startedEnd,
+ String finishBegin, String finishEnd, Set<String> applicationTypes,
+ Set<String> applicationTags, Set<String> unselectedFields) {
+ AppsInfo apps = new AppsInfo();
+ long startTime = clock.getTime();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+ try {
+ subClustersActive = federationFacade.getSubClusters(true);
+ } catch (YarnException e) {
+ routerMetrics.incrMultipleAppsFailedRetrieved();
+ return null;
+ }
+
+ // Send the requests in parallel
+
+ ExecutorCompletionService<AppsInfo> compSvc =
+ new ExecutorCompletionService<AppsInfo>(this.threadpool);
+
+ for (final SubClusterInfo info : subClustersActive.values()) {
+ compSvc.submit(new Callable<AppsInfo>() {
+ @Override
+ public AppsInfo call() {
+ DefaultRequestInterceptorREST interceptor =
+ getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+ info.getClientRMServiceAddress());
+ AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery,
+ finalStatusQuery, userQuery, queueQuery, count, startedBegin,
+ startedEnd, finishBegin, finishEnd, applicationTypes,
+ applicationTags, unselectedFields);
+
+ if (rmApps == null) {
+ routerMetrics.incrMultipleAppsFailedRetrieved();
+ LOG.error("Subcluster " + info.getSubClusterId()
+ + " failed to return appReport.");
+ return null;
+ }
+ return rmApps;
+ }
+ });
+ }
+
+ // Collect all the responses in parallel
+
+ for (int i = 0; i < subClustersActive.values().size(); i++) {
+ try {
+ Future<AppsInfo> future = compSvc.take();
+ AppsInfo appsResponse = future.get();
+
+ long stopTime = clock.getTime();
+ routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
+
+ if (appsResponse != null) {
+ apps.addAll(appsResponse.getApps());
+ }
+ } catch (Throwable e) {
+ routerMetrics.incrMultipleAppsFailedRetrieved();
+ LOG.warn("Failed to get application report ", e);
+ }
+ }
+
+ if (apps.getApps().isEmpty()) {
+ return null;
+ }
+
+ // Merge all the application reports got from all the available Yarn RMs
+
+ return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(),
+ returnPartialReport);
+ }
+
@Override
public ClusterInfo get() {
return getClusterInfo();
@@ -640,15 +749,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
}
@Override
- public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
- Set<String> statesQuery, String finalStatusQuery, String userQuery,
- String queueQuery, String count, String startedBegin, String startedEnd,
- String finishBegin, String finishEnd, Set<String> applicationTypes,
- Set<String> applicationTags, Set<String> unselectedFields) {
- throw new NotImplementedException();
- }
-
- @Override
public AppState getAppState(HttpServletRequest hsr, String appId)
throws AuthorizationException {
throw new NotImplementedException();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
index 18618ee..e633b6a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.router.webapp;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -33,7 +35,11 @@ import javax.ws.rs.core.Response.ResponseBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -55,6 +61,8 @@ public final class RouterWebServiceUtil {
private static final Log LOG =
LogFactory.getLog(RouterWebServiceUtil.class.getName());
+ private final static String PARTIAL_REPORT = "Partial Report ";
+
/** Disable constructor. */
private RouterWebServiceUtil() {
}
@@ -224,4 +232,103 @@ public final class RouterWebServiceUtil {
}
-}
\ No newline at end of file
+ /**
+ * Merges a list of AppInfo grouping by ApplicationId. Our current policy
+ * is to merge the application reports from the reacheable SubClusters.
+ * Via configuration parameter, we decide whether to return applications
+ * for which the primary AM is missing or to omit them.
+ *
+ * @param appsInfo a list of AppInfo to merge
+ * @param returnPartialResult if the merge AppsInfo should contain partial
+ * result or not
+ * @return the merged AppsInfo
+ */
+ public static AppsInfo mergeAppsInfo(ArrayList<AppInfo> appsInfo,
+ boolean returnPartialResult) {
+ AppsInfo allApps = new AppsInfo();
+
+ Map<String, AppInfo> federationAM = new HashMap<String, AppInfo>();
+ Map<String, AppInfo> federationUAMSum = new HashMap<String, AppInfo>();
+ for (AppInfo a : appsInfo) {
+ // Check if this AppInfo is an AM
+ if (a.getAMHostHttpAddress() != null) {
+ // Insert in the list of AM
+ federationAM.put(a.getAppId(), a);
+ // Check if there are any UAM found before
+ if (federationUAMSum.containsKey(a.getAppId())) {
+ // Merge the current AM with the found UAM
+ mergeAMWithUAM(a, federationUAMSum.get(a.getAppId()));
+ // Remove the sum of the UAMs
+ federationUAMSum.remove(a.getAppId());
+ }
+ // This AppInfo is an UAM
+ } else {
+ if (federationAM.containsKey(a.getAppId())) {
+ // Merge the current UAM with its own AM
+ mergeAMWithUAM(federationAM.get(a.getAppId()), a);
+ } else if (federationUAMSum.containsKey(a.getAppId())) {
+ // Merge the current UAM with its own UAM and update the list of UAM
+ federationUAMSum.put(a.getAppId(),
+ mergeUAMWithUAM(federationUAMSum.get(a.getAppId()), a));
+ } else {
+ // Insert in the list of UAM
+ federationUAMSum.put(a.getAppId(), a);
+ }
+ }
+ }
+
+ // Check the remaining UAMs are depending or not from federation
+ for (AppInfo a : federationUAMSum.values()) {
+ if (returnPartialResult || (a.getName() != null
+ && !(a.getName().startsWith(UnmanagedApplicationManager.APP_NAME)
+ || a.getName().startsWith(PARTIAL_REPORT)))) {
+ federationAM.put(a.getAppId(), a);
+ }
+ }
+
+ allApps.addAll(new ArrayList<AppInfo>(federationAM.values()));
+ return allApps;
+ }
+
+ private static AppInfo mergeUAMWithUAM(AppInfo uam1, AppInfo uam2) {
+ AppInfo partialReport = new AppInfo();
+ partialReport.setAppId(uam1.getAppId());
+ partialReport.setName(PARTIAL_REPORT + uam1.getAppId());
+ // We pick the status of the first uam
+ partialReport.setState(uam1.getState());
+ // Merge the newly partial AM with UAM1 and then with UAM2
+ mergeAMWithUAM(partialReport, uam1);
+ mergeAMWithUAM(partialReport, uam2);
+ return partialReport;
+ }
+
+ private static void mergeAMWithUAM(AppInfo am, AppInfo uam) {
+ am.setPreemptedResourceMB(
+ am.getPreemptedResourceMB() + uam.getPreemptedResourceMB());
+ am.setPreemptedResourceVCores(
+ am.getPreemptedResourceVCores() + uam.getPreemptedResourceVCores());
+ am.setNumNonAMContainerPreempted(am.getNumNonAMContainerPreempted()
+ + uam.getNumNonAMContainerPreempted());
+ am.setNumAMContainerPreempted(
+ am.getNumAMContainerPreempted() + uam.getNumAMContainerPreempted());
+ am.setPreemptedMemorySeconds(
+ am.getPreemptedMemorySeconds() + uam.getPreemptedMemorySeconds());
+ am.setPreemptedVcoreSeconds(
+ am.getPreemptedVcoreSeconds() + uam.getPreemptedVcoreSeconds());
+
+ if (am.getState() == YarnApplicationState.RUNNING
+ && uam.getState() == am.getState()) {
+
+ am.getResourceRequests().addAll(uam.getResourceRequests());
+
+ am.setAllocatedMB(am.getAllocatedMB() + uam.getAllocatedMB());
+ am.setAllocatedVCores(am.getAllocatedVCores() + uam.getAllocatedVCores());
+ am.setReservedMB(am.getReservedMB() + uam.getReservedMB());
+ am.setReservedVCores(am.getReservedVCores() + uam.getReservedMB());
+ am.setRunningContainers(
+ am.getRunningContainers() + uam.getRunningContainers());
+ am.setMemorySeconds(am.getMemorySeconds() + uam.getMemorySeconds());
+ am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 3cdafd8..4c18ace 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -196,6 +196,45 @@ public class TestRouterMetrics {
Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
}
+ /**
+ * This test validates the correctness of the metric: Retrieved Multiple Apps
+ * successfully.
+ */
+ @Test
+ public void testSucceededMultipleAppsReport() {
+
+ long totalGoodBefore = metrics.getNumSucceededMultipleAppsRetrieved();
+
+ goodSubCluster.getApplicationsReport(100);
+
+ Assert.assertEquals(totalGoodBefore + 1,
+ metrics.getNumSucceededMultipleAppsRetrieved());
+ Assert.assertEquals(100, metrics.getLatencySucceededMultipleGetAppReport(),
+ 0);
+
+ goodSubCluster.getApplicationsReport(200);
+
+ Assert.assertEquals(totalGoodBefore + 2,
+ metrics.getNumSucceededMultipleAppsRetrieved());
+ Assert.assertEquals(150, metrics.getLatencySucceededMultipleGetAppReport(),
+ 0);
+ }
+
+ /**
+ * This test validates the correctness of the metric: Failed to retrieve
+ * Multiple Apps.
+ */
+ @Test
+ public void testMulipleAppsReportFailed() {
+
+ long totalBadbefore = metrics.getMultipleAppsFailedRetrieved();
+
+ badSubCluster.getApplicationsReport();
+
+ Assert.assertEquals(totalBadbefore + 1,
+ metrics.getMultipleAppsFailedRetrieved());
+ }
+
// Records failures for all calls
private class MockBadSubCluster {
public void getNewApplication() {
@@ -217,6 +256,11 @@ public class TestRouterMetrics {
LOG.info("Mocked: failed getApplicationReport call");
metrics.incrAppsFailedRetrieved();
}
+
+ public void getApplicationsReport() {
+ LOG.info("Mocked: failed getApplicationsReport call");
+ metrics.incrMultipleAppsFailedRetrieved();
+ }
}
// Records successes for all calls
@@ -244,5 +288,11 @@ public class TestRouterMetrics {
duration);
metrics.succeededAppsRetrieved(duration);
}
+
+ public void getApplicationsReport(long duration) {
+ LOG.info("Mocked: successful getApplicationsReport call with duration {}",
+ duration);
+ metrics.succeededMultipleAppsRetrieved(duration);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 91e601e..93527e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -18,26 +18,32 @@
package org.apache.hadoop.yarn.server.router.webapp;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* This class mocks the RESTRequestInterceptor.
*/
@@ -101,6 +107,27 @@ public class MockDefaultRequestInterceptorREST
}
@Override
+ public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+ Set<String> statesQuery, String finalStatusQuery, String userQuery,
+ String queueQuery, String count, String startedBegin, String startedEnd,
+ String finishBegin, String finishEnd, Set<String> applicationTypes,
+ Set<String> applicationTags, Set<String> unselectedFields) {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+ AppsInfo appsInfo = new AppsInfo();
+ AppInfo appInfo = new AppInfo();
+
+ appInfo.setAppId(
+ ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+ applicationCounter.incrementAndGet()).toString());
+ appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
+
+ appsInfo.add(appInfo);
+ return appsInfo;
+ }
+
+ @Override
public Response updateAppState(AppState targetState, HttpServletRequest hsr,
String appId) throws AuthorizationException, YarnException,
InterruptedException, IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index fb6cdd8..2ee62af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.junit.Assert;
import org.junit.Test;
@@ -374,4 +375,20 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertNull(response);
}
+ /**
+ * This test validates the correctness of GetApplicationsReport in case each
+ * subcluster provided one application.
+ */
+ @Test
+ public void testGetApplicationsReport()
+ throws YarnException, IOException, InterruptedException {
+
+ AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
+ null, null, null, null, null, null, null, null, null);
+
+ Assert.assertNotNull(responseGet);
+ Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
+ // The merged operations will be tested in TestRouterWebServiceUtil
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index 48bc1a8..38b1027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
@@ -271,4 +272,48 @@ public class TestFederationInterceptorRESTRetry
.getApplicationHomeSubCluster().getHomeSubCluster());
}
+ /**
+ * This test validates the correctness of GetApps in case the cluster is
+ * composed of only 1 bad SubCluster.
+ */
+ @Test
+ public void testGetAppsOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+
+ setupCluster(Arrays.asList(bad2));
+
+ AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null);
+ Assert.assertNull(response);
+ }
+
+ /**
+ * This test validates the correctness of GetApps in case the cluster is
+ * composed of only 2 bad SubClusters.
+ */
+ @Test
+ public void testGetAppsTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null);
+ Assert.assertNull(response);
+ }
+
+ /**
+ * This test validates the correctness of GetApps in case the cluster is
+ * composed of only 1 bad SubCluster and a good one.
+ */
+ @Test
+ public void testGetAppsOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(good, bad2));
+
+ AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+ null, null, null, null, null, null, null, null);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(1, response.getApps().size());
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc8893ed/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
new file mode 100644
index 0000000..810432a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to validate RouterWebServiceUtil methods.
+ */
+public class TestRouterWebServiceUtil {
+
+ private static final ApplicationId APPID1 = ApplicationId.newInstance(1, 1);
+ private static final ApplicationId APPID2 = ApplicationId.newInstance(2, 1);
+ private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
+ private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
+
+ /**
+ * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+ * in case we want to merge 4 AMs. The expected result would be the same 4
+ * AMs.
+ */
+ @Test
+ public void testMerge4DifferentApps() {
+
+ AppsInfo apps = new AppsInfo();
+ int value = 1000;
+
+ AppInfo app1 = new AppInfo();
+ app1.setAppId(APPID1.toString());
+ app1.setAMHostHttpAddress("http://i_am_the_AM1:1234");
+ app1.setState(YarnApplicationState.FINISHED);
+ app1.setNumAMContainerPreempted(value);
+ apps.add(app1);
+
+ AppInfo app2 = new AppInfo();
+ app2.setAppId(APPID2.toString());
+ app2.setAMHostHttpAddress("http://i_am_the_AM2:1234");
+ app2.setState(YarnApplicationState.ACCEPTED);
+ app2.setAllocatedVCores(2 * value);
+
+ apps.add(app2);
+
+ AppInfo app3 = new AppInfo();
+ app3.setAppId(APPID3.toString());
+ app3.setAMHostHttpAddress("http://i_am_the_AM3:1234");
+ app3.setState(YarnApplicationState.RUNNING);
+ app3.setReservedMB(3 * value);
+ apps.add(app3);
+
+ AppInfo app4 = new AppInfo();
+ app4.setAppId(APPID4.toString());
+ app4.setAMHostHttpAddress("http://i_am_the_AM4:1234");
+ app4.setState(YarnApplicationState.NEW);
+ app4.setAllocatedMB(4 * value);
+ apps.add(app4);
+
+ AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(4, result.getApps().size());
+
+ List<String> appIds = new ArrayList<String>();
+ AppInfo appInfo1 = null, appInfo2 = null, appInfo3 = null, appInfo4 = null;
+ for (AppInfo app : result.getApps()) {
+ appIds.add(app.getAppId());
+ if (app.getAppId().equals(APPID1.toString())) {
+ appInfo1 = app;
+ }
+ if (app.getAppId().equals(APPID2.toString())) {
+ appInfo2 = app;
+ }
+ if (app.getAppId().equals(APPID3.toString())) {
+ appInfo3 = app;
+ }
+ if (app.getAppId().equals(APPID4.toString())) {
+ appInfo4 = app;
+ }
+ }
+
+ Assert.assertTrue(appIds.contains(APPID1.toString()));
+ Assert.assertTrue(appIds.contains(APPID2.toString()));
+ Assert.assertTrue(appIds.contains(APPID3.toString()));
+ Assert.assertTrue(appIds.contains(APPID4.toString()));
+
+ // Check preservations APP1
+ Assert.assertEquals(app1.getState(), appInfo1.getState());
+ Assert.assertEquals(app1.getNumAMContainerPreempted(),
+ appInfo1.getNumAMContainerPreempted());
+
+ // Check preservations APP2
+ Assert.assertEquals(app2.getState(), appInfo2.getState());
+ Assert.assertEquals(app3.getAllocatedVCores(),
+ appInfo3.getAllocatedVCores());
+
+ // Check preservations APP3
+ Assert.assertEquals(app3.getState(), appInfo3.getState());
+ Assert.assertEquals(app3.getReservedMB(), appInfo3.getReservedMB());
+
+ // Check preservations APP3
+ Assert.assertEquals(app4.getState(), appInfo4.getState());
+ Assert.assertEquals(app3.getAllocatedMB(), appInfo3.getAllocatedMB());
+ }
+
+ /**
+ * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+ * in case we want to merge 2 UAMs and their own AM. The status of the AM is
+ * FINISHED, so we check the correctness of the merging of the historical
+ * values. The expected result would be 1 report with the merged information.
+ */
+ @Test
+ public void testMergeAppsFinished() {
+
+ AppsInfo apps = new AppsInfo();
+
+ String amHost = "http://i_am_the_AM1:1234";
+ AppInfo am = new AppInfo();
+ am.setAppId(APPID1.toString());
+ am.setAMHostHttpAddress(amHost);
+ am.setState(YarnApplicationState.FINISHED);
+
+ int value = 1000;
+ setAppInfoFinished(am, value);
+
+ apps.add(am);
+
+ AppInfo uam1 = new AppInfo();
+ uam1.setAppId(APPID1.toString());
+ apps.add(uam1);
+
+ setAppInfoFinished(uam1, value);
+
+ AppInfo uam2 = new AppInfo();
+ uam2.setAppId(APPID1.toString());
+ apps.add(uam2);
+
+ setAppInfoFinished(uam2, value);
+
+ // in this case the result does not change if we enable partial result
+ AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(1, result.getApps().size());
+
+ AppInfo app = result.getApps().get(0);
+
+ Assert.assertEquals(APPID1.toString(), app.getAppId());
+ Assert.assertEquals(amHost, app.getAMHostHttpAddress());
+ Assert.assertEquals(value * 3, app.getPreemptedResourceMB());
+ Assert.assertEquals(value * 3, app.getPreemptedResourceVCores());
+ Assert.assertEquals(value * 3, app.getNumNonAMContainerPreempted());
+ Assert.assertEquals(value * 3, app.getNumAMContainerPreempted());
+ Assert.assertEquals(value * 3, app.getPreemptedMemorySeconds());
+ Assert.assertEquals(value * 3, app.getPreemptedVcoreSeconds());
+ }
+
+ private void setAppInfoFinished(AppInfo am, int value) {
+ am.setPreemptedResourceMB(value);
+ am.setPreemptedResourceVCores(value);
+ am.setNumNonAMContainerPreempted(value);
+ am.setNumAMContainerPreempted(value);
+ am.setPreemptedMemorySeconds(value);
+ am.setPreemptedVcoreSeconds(value);
+ }
+
+ /**
+ * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+ * in case we want to merge 2 UAMs and their own AM. The status of the AM is
+ * RUNNING, so we check the correctness of the merging of the runtime values.
+ * The expected result would be 1 report with the merged information.
+ */
+ @Test
+ public void testMergeAppsRunning() {
+
+ AppsInfo apps = new AppsInfo();
+
+ String amHost = "http://i_am_the_AM2:1234";
+ AppInfo am = new AppInfo();
+ am.setAppId(APPID2.toString());
+ am.setAMHostHttpAddress(amHost);
+ am.setState(YarnApplicationState.RUNNING);
+
+ int value = 1000;
+ setAppInfoRunning(am, value);
+
+ apps.add(am);
+
+ AppInfo uam1 = new AppInfo();
+ uam1.setAppId(APPID2.toString());
+ uam1.setState(YarnApplicationState.RUNNING);
+ apps.add(uam1);
+
+ setAppInfoRunning(uam1, value);
+
+ AppInfo uam2 = new AppInfo();
+ uam2.setAppId(APPID2.toString());
+ uam2.setState(YarnApplicationState.RUNNING);
+ apps.add(uam2);
+
+ setAppInfoRunning(uam2, value);
+
+ // in this case the result does not change if we enable partial result
+ AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(1, result.getApps().size());
+
+ AppInfo app = result.getApps().get(0);
+
+ Assert.assertEquals(APPID2.toString(), app.getAppId());
+ Assert.assertEquals(amHost, app.getAMHostHttpAddress());
+ Assert.assertEquals(value * 3, app.getAllocatedMB());
+ Assert.assertEquals(value * 3, app.getAllocatedVCores());
+ Assert.assertEquals(value * 3, app.getReservedMB());
+ Assert.assertEquals(value * 3, app.getReservedVCores());
+ Assert.assertEquals(value * 3, app.getRunningContainers());
+ Assert.assertEquals(value * 3, app.getMemorySeconds());
+ Assert.assertEquals(value * 3, app.getVcoreSeconds());
+ Assert.assertEquals(3, app.getResourceRequests().size());
+ }
+
+ private void setAppInfoRunning(AppInfo am, int value) {
+ am.getResourceRequests().add(new ResourceRequestInfo());
+ am.setAllocatedMB(value);
+ am.setAllocatedVCores(value);
+ am.setReservedMB(value);
+ am.setReservedVCores(value);
+ am.setRunningContainers(value);
+ am.setMemorySeconds(value);
+ am.setVcoreSeconds(value);
+ }
+
+ /**
+ * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+ * in case we want to merge 2 UAMs without their own AM. The expected result
+ * would be an empty report or a partial report of the 2 UAMs depending on the
+ * selected policy.
+ */
+ @Test
+ public void testMerge2UAM() {
+
+ AppsInfo apps = new AppsInfo();
+
+ AppInfo app1 = new AppInfo();
+ app1.setAppId(APPID1.toString());
+ app1.setName(UnmanagedApplicationManager.APP_NAME);
+ app1.setState(YarnApplicationState.RUNNING);
+ apps.add(app1);
+
+ AppInfo app2 = new AppInfo();
+ app2.setAppId(APPID1.toString());
+ app2.setName(UnmanagedApplicationManager.APP_NAME);
+ app2.setState(YarnApplicationState.RUNNING);
+ apps.add(app2);
+
+ AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(0, result.getApps().size());
+
+ // By enabling partial result, the expected result would be a partial report
+ // of the 2 UAMs
+ AppsInfo result2 = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), true);
+ Assert.assertNotNull(result2);
+ Assert.assertEquals(1, result2.getApps().size());
+ Assert.assertEquals(YarnApplicationState.RUNNING,
+ result2.getApps().get(0).getState());
+ }
+
+ /**
+ * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+ * in case we want to merge 1 UAM that does not depend on Federation. The
+ * excepted result would be the same app report.
+ */
+ @Test
+ public void testMergeUAM() {
+
+ AppsInfo apps = new AppsInfo();
+
+ AppInfo app1 = new AppInfo();
+ app1.setAppId(APPID1.toString());
+ app1.setName("Test");
+ apps.add(app1);
+
+ // in this case the result does not change if we enable partial result
+ AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+ Assert.assertNotNull(result);
+ Assert.assertEquals(1, result.getApps().size());
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org