You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/08/31 22:05:53 UTC
hadoop git commit: YARN-7095. Federation: routing
getNode/getNodes/getMetrics REST invocations transparently to multiple RMs.
(Giovanni Matteo Fumarola via Subru).
Repository: hadoop
Updated Branches:
refs/heads/trunk d4417dae4 -> bac4e8cca
YARN-7095. Federation: routing getNode/getNodes/getMetrics REST invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bac4e8cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bac4e8cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bac4e8cc
Branch: refs/heads/trunk
Commit: bac4e8cca8b54405f5e37b90e545b93bbadee0f4
Parents: d4417da
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Aug 31 15:05:41 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Aug 31 15:05:41 2017 -0700
----------------------------------------------------------------------
.../webapp/dao/ClusterMetricsInfo.java | 162 +++++++++---
.../resourcemanager/webapp/dao/NodeInfo.java | 17 +-
.../resourcemanager/webapp/dao/NodesInfo.java | 4 +
.../webapp/FederationInterceptorREST.java | 234 ++++++++++++++++-
.../router/webapp/RouterWebServiceUtil.java | 101 ++++++-
.../MockDefaultRequestInterceptorREST.java | 43 +++
.../webapp/TestFederationInterceptorREST.java | 54 +++-
.../TestFederationInterceptorRESTRetry.java | 207 ++++++++++++++-
.../router/webapp/TestRouterWebServiceUtil.java | 262 +++++++++++++++++++
9 files changed, 1030 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index dc42eb6..3214cb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -31,35 +31,35 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
@XmlAccessorType(XmlAccessType.FIELD)
public class ClusterMetricsInfo {
- protected int appsSubmitted;
- protected int appsCompleted;
- protected int appsPending;
- protected int appsRunning;
- protected int appsFailed;
- protected int appsKilled;
-
- protected long reservedMB;
- protected long availableMB;
- protected long allocatedMB;
-
- protected long reservedVirtualCores;
- protected long availableVirtualCores;
- protected long allocatedVirtualCores;
-
- protected int containersAllocated;
- protected int containersReserved;
- protected int containersPending;
-
- protected long totalMB;
- protected long totalVirtualCores;
- protected int totalNodes;
- protected int lostNodes;
- protected int unhealthyNodes;
- protected int decommissioningNodes;
- protected int decommissionedNodes;
- protected int rebootedNodes;
- protected int activeNodes;
- protected int shutdownNodes;
+ private int appsSubmitted;
+ private int appsCompleted;
+ private int appsPending;
+ private int appsRunning;
+ private int appsFailed;
+ private int appsKilled;
+
+ private long reservedMB;
+ private long availableMB;
+ private long allocatedMB;
+
+ private long reservedVirtualCores;
+ private long availableVirtualCores;
+ private long allocatedVirtualCores;
+
+ private int containersAllocated;
+ private int containersReserved;
+ private int containersPending;
+
+ private long totalMB;
+ private long totalVirtualCores;
+ private int totalNodes;
+ private int lostNodes;
+ private int unhealthyNodes;
+ private int decommissioningNodes;
+ private int decommissionedNodes;
+ private int rebootedNodes;
+ private int activeNodes;
+ private int shutdownNodes;
public ClusterMetricsInfo() {
} // JAXB needs this
@@ -93,8 +93,8 @@ public class ClusterMetricsInfo {
if (rs instanceof CapacityScheduler) {
this.totalMB = availableMB + allocatedMB + reservedMB;
- this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores
- + containersReserved;
+ this.totalVirtualCores =
+ availableVirtualCores + allocatedVirtualCores + containersReserved;
} else {
this.totalMB = availableMB + allocatedMB;
this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
@@ -210,4 +210,104 @@ public class ClusterMetricsInfo {
return this.shutdownNodes;
}
+ public void setContainersReserved(int containersReserved) {
+ this.containersReserved = containersReserved;
+ }
+
+ public void setContainersPending(int containersPending) {
+ this.containersPending = containersPending;
+ }
+
+ public void setAppsSubmitted(int appsSubmitted) {
+ this.appsSubmitted = appsSubmitted;
+ }
+
+ public void setAppsCompleted(int appsCompleted) {
+ this.appsCompleted = appsCompleted;
+ }
+
+ public void setAppsPending(int appsPending) {
+ this.appsPending = appsPending;
+ }
+
+ public void setAppsRunning(int appsRunning) {
+ this.appsRunning = appsRunning;
+ }
+
+ public void setAppsFailed(int appsFailed) {
+ this.appsFailed = appsFailed;
+ }
+
+ public void setAppsKilled(int appsKilled) {
+ this.appsKilled = appsKilled;
+ }
+
+ public void setReservedMB(long reservedMB) {
+ this.reservedMB = reservedMB;
+ }
+
+ public void setAvailableMB(long availableMB) {
+ this.availableMB = availableMB;
+ }
+
+ public void setAllocatedMB(long allocatedMB) {
+ this.allocatedMB = allocatedMB;
+ }
+
+ public void setReservedVirtualCores(long reservedVirtualCores) {
+ this.reservedVirtualCores = reservedVirtualCores;
+ }
+
+ public void setAvailableVirtualCores(long availableVirtualCores) {
+ this.availableVirtualCores = availableVirtualCores;
+ }
+
+ public void setAllocatedVirtualCores(long allocatedVirtualCores) {
+ this.allocatedVirtualCores = allocatedVirtualCores;
+ }
+
+ public void setContainersAllocated(int containersAllocated) {
+ this.containersAllocated = containersAllocated;
+ }
+
+ public void setTotalMB(long totalMB) {
+ this.totalMB = totalMB;
+ }
+
+ public void setTotalVirtualCores(long totalVirtualCores) {
+ this.totalVirtualCores = totalVirtualCores;
+ }
+
+ public void setTotalNodes(int totalNodes) {
+ this.totalNodes = totalNodes;
+ }
+
+ public void setLostNodes(int lostNodes) {
+ this.lostNodes = lostNodes;
+ }
+
+ public void setUnhealthyNodes(int unhealthyNodes) {
+ this.unhealthyNodes = unhealthyNodes;
+ }
+
+ public void setDecommissioningNodes(int decommissioningNodes) {
+ this.decommissioningNodes = decommissioningNodes;
+ }
+
+ public void setDecommissionedNodes(int decommissionedNodes) {
+ this.decommissionedNodes = decommissionedNodes;
+ }
+
+ public void setRebootedNodes(int rebootedNodes) {
+ this.rebootedNodes = rebootedNodes;
+ }
+
+ public void setActiveNodes(int activeNodes) {
+ this.activeNodes = activeNodes;
+ }
+
+ public void setShutdownNodes(int shutdownNodes) {
+ this.shutdownNodes = shutdownNodes;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
index 3416e52..2530c8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
@@ -33,16 +33,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
+import com.google.common.annotations.VisibleForTesting;
+
@XmlRootElement(name = "node")
@XmlAccessorType(XmlAccessType.FIELD)
public class NodeInfo {
protected String rack;
protected NodeState state;
- protected String id;
+ private String id;
protected String nodeHostName;
protected String nodeHTTPAddress;
- protected long lastHealthUpdate;
+ private long lastHealthUpdate;
protected String version;
protected String healthReport;
protected int numContainers;
@@ -184,4 +186,15 @@ public class NodeInfo {
public ResourceUtilizationInfo getResourceUtilization() {
return this.resourceUtilization;
}
+
+ @VisibleForTesting
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @VisibleForTesting
+ public void setLastHealthUpdate(long lastHealthUpdate) {
+ this.lastHealthUpdate = lastHealthUpdate;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
index 7dacd10..8174be0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
@@ -39,4 +39,8 @@ public class NodesInfo {
public ArrayList<NodeInfo> getNodes() {
return node;
}
+
+ public void addAll(ArrayList<NodeInfo> nodesInfo) {
+ node.addAll(nodesInfo);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 3a91e35..bfd35c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -28,7 +28,6 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.servlet.http.HttpServletRequest;
@@ -40,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -85,10 +85,12 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@@ -136,7 +138,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
routerMetrics = RouterMetrics.getMetrics();
- threadpool = Executors.newCachedThreadPool();
+ threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat("FederationInterceptorREST #%d").build());
returnPartialReport =
conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
@@ -695,39 +698,237 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
returnPartialReport);
}
+ /**
+ * The Yarn Router will forward to the request to all the SubClusters to find
+ * where the node is running.
+ * <p>
+ * Possible failure:
+ * <p>
+ * Client: identical behavior as {@code RMWebServices}.
+ * <p>
+ * Router: the Client will timeout and resubmit the request.
+ * <p>
+ * ResourceManager: the Router will timeout and the call will fail.
+ * <p>
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
@Override
- public ClusterInfo get() {
- return getClusterInfo();
+ public NodeInfo getNode(String nodeId) {
+ Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+ try {
+ subClustersActive = federationFacade.getSubClusters(true);
+ } catch (YarnException e) {
+ throw new NotFoundException(e.getMessage());
+ }
+
+ if (subClustersActive.isEmpty()) {
+ throw new NotFoundException(
+ FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+ }
+
+ // Send the requests in parallel
+
+ ExecutorCompletionService<NodeInfo> compSvc =
+ new ExecutorCompletionService<NodeInfo>(this.threadpool);
+
+ for (final SubClusterInfo info : subClustersActive.values()) {
+ compSvc.submit(new Callable<NodeInfo>() {
+ @Override
+ public NodeInfo call() {
+ DefaultRequestInterceptorREST interceptor =
+ getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+ info.getClientRMServiceAddress());
+ try {
+ NodeInfo nodeInfo = interceptor.getNode(nodeId);
+ return nodeInfo;
+ } catch (Exception e) {
+ LOG.error("Subcluster " + info.getSubClusterId()
+ + " failed to return nodeInfo.");
+ return null;
+ }
+ }
+ });
+ }
+
+ // Collect all the responses in parallel
+ NodeInfo nodeInfo = null;
+ for (int i = 0; i < subClustersActive.values().size(); i++) {
+ try {
+ Future<NodeInfo> future = compSvc.take();
+ NodeInfo nodeResponse = future.get();
+
+ // Check if the node was found in this SubCluster
+ if (nodeResponse != null) {
+ // Check if the node was already found in a different SubCluster and
+ // it has an old health report
+ if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse
+ .getLastHealthUpdate()) {
+ nodeInfo = nodeResponse;
+ }
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to get node report ", e);
+ }
+ }
+ if (nodeInfo == null) {
+ throw new NotFoundException("nodeId, " + nodeId + ", is not found");
+ }
+ return nodeInfo;
}
+ /**
+ * The Yarn Router will forward the request to all the Yarn RMs in parallel,
+ * after that it will remove all the duplicated NodeInfo by using the NodeId.
+ * <p>
+ * Possible failure:
+ * <p>
+ * Client: identical behavior as {@code RMWebServices}.
+ * <p>
+ * Router: the Client will timeout and resubmit the request.
+ * <p>
+ * ResourceManager: the Router calls each Yarn RM in parallel by using one
+ * thread for each Yarn RM. In case a Yarn RM fails, a single call will
+ * timeout. However the Router will use the NodesInfo it got, and provides a
+ * partial list to the client.
+ * <p>
+ * State Store: the Router will timeout and it will retry depending on the
+ * FederationFacade settings - if the failure happened before the select
+ * operation.
+ */
@Override
- public ClusterInfo getClusterInfo() {
- throw new NotImplementedException();
+ public NodesInfo getNodes(String states) {
+
+ NodesInfo nodes = new NodesInfo();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+ try {
+ subClustersActive = federationFacade.getSubClusters(true);
+ } catch (YarnException e) {
+ LOG.error(e.getMessage());
+ return new NodesInfo();
+ }
+
+ // Send the requests in parallel
+
+ ExecutorCompletionService<NodesInfo> compSvc =
+ new ExecutorCompletionService<NodesInfo>(this.threadpool);
+
+ for (final SubClusterInfo info : subClustersActive.values()) {
+ compSvc.submit(new Callable<NodesInfo>() {
+ @Override
+ public NodesInfo call() {
+ DefaultRequestInterceptorREST interceptor =
+ getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+ info.getClientRMServiceAddress());
+ try {
+ NodesInfo nodesInfo = interceptor.getNodes(states);
+ return nodesInfo;
+ } catch (Exception e) {
+ LOG.error("Subcluster " + info.getSubClusterId()
+ + " failed to return nodesInfo.");
+ return null;
+ }
+ }
+ });
+ }
+
+ // Collect all the responses in parallel
+
+ for (int i = 0; i < subClustersActive.values().size(); i++) {
+ try {
+ Future<NodesInfo> future = compSvc.take();
+ NodesInfo nodesResponse = future.get();
+
+ if (nodesResponse != null) {
+ nodes.addAll(nodesResponse.getNodes());
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to get nodes report ", e);
+ }
+ }
+
+ // Delete duplicate from all the node reports got from all the available
+ // Yarn RMs. Nodes can be moved from one subclusters to another. In this
+ // operation they result LOST/RUNNING in the previous SubCluster and
+ // NEW/RUNNING in the new one.
+
+ return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
}
@Override
public ClusterMetricsInfo getClusterMetricsInfo() {
- throw new NotImplementedException();
+ ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+
+ Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+ try {
+ subClustersActive = federationFacade.getSubClusters(true);
+ } catch (YarnException e) {
+ LOG.error(e.getLocalizedMessage());
+ return metrics;
+ }
+
+ // Send the requests in parallel
+
+ ExecutorCompletionService<ClusterMetricsInfo> compSvc =
+ new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
+
+ for (final SubClusterInfo info : subClustersActive.values()) {
+ compSvc.submit(new Callable<ClusterMetricsInfo>() {
+ @Override
+ public ClusterMetricsInfo call() {
+ DefaultRequestInterceptorREST interceptor =
+ getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+ info.getClientRMServiceAddress());
+ try {
+ ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
+ return metrics;
+ } catch (Exception e) {
+ LOG.error("Subcluster " + info.getSubClusterId()
+ + " failed to return Cluster Metrics.");
+ return null;
+ }
+ }
+ });
+ }
+
+ // Collect all the responses in parallel
+
+ for (int i = 0; i < subClustersActive.values().size(); i++) {
+ try {
+ Future<ClusterMetricsInfo> future = compSvc.take();
+ ClusterMetricsInfo metricsResponse = future.get();
+
+ if (metricsResponse != null) {
+ RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
+ }
+ } catch (Throwable e) {
+ LOG.warn("Failed to get nodes report ", e);
+ }
+ }
+
+ return metrics;
}
@Override
- public SchedulerTypeInfo getSchedulerInfo() {
- throw new NotImplementedException();
+ public ClusterInfo get() {
+ return getClusterInfo();
}
@Override
- public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
- throws IOException {
+ public ClusterInfo getClusterInfo() {
throw new NotImplementedException();
}
@Override
- public NodesInfo getNodes(String states) {
+ public SchedulerTypeInfo getSchedulerInfo() {
throw new NotImplementedException();
}
@Override
- public NodeInfo getNode(String nodeId) {
+ public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
+ throws IOException {
throw new NotImplementedException();
}
@@ -933,4 +1134,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
+ "in the chain. Check if the interceptor pipeline configuration "
+ "is correct");
}
+
+ @Override
+ public void shutdown() {
+ if (threadpool != null) {
+ threadpool.shutdown();
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
index e633b6a..e769a86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.apache.hadoop.yarn.webapp.BadRequestException;
import org.apache.hadoop.yarn.webapp.ForbiddenException;
@@ -233,10 +237,10 @@ public final class RouterWebServiceUtil {
}
/**
- * Merges a list of AppInfo grouping by ApplicationId. Our current policy
- * is to merge the application reports from the reacheable SubClusters.
- * Via configuration parameter, we decide whether to return applications
- * for which the primary AM is missing or to omit them.
+ * Merges a list of AppInfo grouping by ApplicationId. Our current policy is
+ * to merge the application reports from the reacheable SubClusters. Via
+ * configuration parameter, we decide whether to return applications for which
+ * the primary AM is missing or to omit them.
*
* @param appsInfo a list of AppInfo to merge
* @param returnPartialResult if the merge AppsInfo should contain partial
@@ -331,4 +335,93 @@ public final class RouterWebServiceUtil {
am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
}
}
+
+ /**
+ * Deletes all the duplicate NodeInfo by discarding the old instances.
+ *
+ * @param nodes a list of NodeInfo to check for duplicates
+ * @return a NodesInfo that contains a list of NodeInfos without duplicates
+ */
+ public static NodesInfo deleteDuplicateNodesInfo(ArrayList<NodeInfo> nodes) {
+ NodesInfo nodesInfo = new NodesInfo();
+
+ Map<String, NodeInfo> nodesMap = new LinkedHashMap<>();
+ for (NodeInfo node : nodes) {
+ String nodeId = node.getNodeId();
+ // If the node already exists, it could be an old instance
+ if (nodesMap.containsKey(nodeId)) {
+ // Check if the node is an old instance
+ if (nodesMap.get(nodeId).getLastHealthUpdate() < node
+ .getLastHealthUpdate()) {
+ nodesMap.put(node.getNodeId(), node);
+ }
+ } else {
+ nodesMap.put(node.getNodeId(), node);
+ }
+ }
+ nodesInfo.addAll(new ArrayList<NodeInfo>(nodesMap.values()));
+ return nodesInfo;
+ }
+
+ /**
+ * Adds all the values from the second ClusterMetricsInfo to the first one.
+ *
+ * @param metrics the ClusterMetricsInfo we want to update
+ * @param metricsResponse the ClusterMetricsInfo we want to add to the first
+ * param
+ */
+ public static void mergeMetrics(ClusterMetricsInfo metrics,
+ ClusterMetricsInfo metricsResponse) {
+ metrics.setAppsSubmitted(
+ metrics.getAppsSubmitted() + metricsResponse.getAppsSubmitted());
+ metrics.setAppsCompleted(
+ metrics.getAppsCompleted() + metricsResponse.getAppsCompleted());
+ metrics.setAppsPending(
+ metrics.getAppsPending() + metricsResponse.getAppsPending());
+ metrics.setAppsRunning(
+ metrics.getAppsRunning() + metricsResponse.getAppsRunning());
+ metrics.setAppsFailed(
+ metrics.getAppsFailed() + metricsResponse.getAppsFailed());
+ metrics.setAppsKilled(
+ metrics.getAppsKilled() + metricsResponse.getAppsKilled());
+
+ metrics.setReservedMB(
+ metrics.getReservedMB() + metricsResponse.getReservedMB());
+ metrics.setAvailableMB(
+ metrics.getAvailableMB() + metricsResponse.getAvailableMB());
+ metrics.setAllocatedMB(
+ metrics.getAllocatedMB() + metricsResponse.getAllocatedMB());
+
+ metrics.setReservedVirtualCores(metrics.getReservedVirtualCores()
+ + metricsResponse.getReservedVirtualCores());
+ metrics.setAvailableVirtualCores(metrics.getAvailableVirtualCores()
+ + metricsResponse.getAvailableVirtualCores());
+ metrics.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores()
+ + metricsResponse.getAllocatedVirtualCores());
+
+ metrics.setContainersAllocated(metrics.getContainersAllocated()
+ + metricsResponse.getContainersAllocated());
+ metrics.setContainersReserved(metrics.getReservedContainers()
+ + metricsResponse.getReservedContainers());
+ metrics.setContainersPending(metrics.getPendingContainers()
+ + metricsResponse.getPendingContainers());
+
+ metrics.setTotalMB(metrics.getTotalMB() + metricsResponse.getTotalMB());
+ metrics.setTotalVirtualCores(
+ metrics.getTotalVirtualCores() + metrics.getTotalVirtualCores());
+ metrics.setTotalNodes(metrics.getTotalNodes() + metrics.getTotalNodes());
+ metrics.setLostNodes(metrics.getLostNodes() + metrics.getLostNodes());
+ metrics.setUnhealthyNodes(
+ metrics.getUnhealthyNodes() + metrics.getUnhealthyNodes());
+ metrics.setDecommissioningNodes(
+ metrics.getDecommissioningNodes() + metrics.getDecommissioningNodes());
+ metrics.setDecommissionedNodes(
+ metrics.getDecommissionedNodes() + metrics.getDecommissionedNodes());
+ metrics.setRebootedNodes(
+ metrics.getRebootedNodes() + metrics.getRebootedNodes());
+ metrics.setActiveNodes(metrics.getActiveNodes() + metrics.getActiveNodes());
+ metrics.setShutdownNodes(
+ metrics.getShutdownNodes() + metrics.getShutdownNodes());
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 93527e5..6afecae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.slf4j.Logger;
@@ -149,6 +152,46 @@ public class MockDefaultRequestInterceptorREST
return Response.status(Status.OK).entity(ret).build();
}
+ @Override
+ public NodeInfo getNode(String nodeId) {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+ NodeInfo node = new NodeInfo();
+ node.setId(nodeId);
+ node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+ return node;
+ }
+
+ @Override
+ public NodesInfo getNodes(String states) {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+ NodeInfo node = new NodeInfo();
+ node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
+ node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+ NodesInfo nodes = new NodesInfo();
+ nodes.add(node);
+ return nodes;
+ }
+
+ @Override
+ public ClusterMetricsInfo getClusterMetricsInfo() {
+ if (!isRunning) {
+ throw new RuntimeException("RM is stopped");
+ }
+ ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+ metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
+ metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
+
+ return metrics;
+ }
+
public void setSubClusterId(int subClusterId) {
setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index 2ee62af..fae4ecf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -36,7 +36,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -388,7 +391,56 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
Assert.assertNotNull(responseGet);
Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
- // The merged operations will be tested in TestRouterWebServiceUtil
+ // The merged operations is tested in TestRouterWebServiceUtil
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case each subcluster
+ * provided one node with the LastHealthUpdate set to the SubClusterId. The
+ * expected result would be the NodeInfo from the last SubCluster that has
+ * LastHealthUpdate equal to Num_SubCluster -1.
+ */
+ @Test
+ public void testGetNode() {
+
+ NodeInfo responseGet = interceptor.getNode("testGetNode");
+
+ Assert.assertNotNull(responseGet);
+ Assert.assertEquals(NUM_SUBCLUSTER - 1, responseGet.getLastHealthUpdate());
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case each subcluster
+ * provided one node.
+ */
+ @Test
+ public void testGetNodes() {
+
+ NodesInfo responseGet = interceptor.getNodes(null);
+
+ Assert.assertNotNull(responseGet);
+ Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getNodes().size());
+ // The remove duplicate operations is tested in TestRouterWebServiceUtil
+ }
+
+ /**
+ * This test validates the correctness of getClusterMetricsInfo in case each
+ * SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
+ * SubClusterId. The expected result would be appSubmitted equals to the sum
+ * of SubClusterId. SubClusterId in this case is an integer.
+ */
+ @Test
+ public void testGetClusterMetrics() {
+
+ ClusterMetricsInfo responseGet = interceptor.getClusterMetricsInfo();
+
+ Assert.assertNotNull(responseGet);
+ int expectedAppSubmitted = 0;
+ for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+ expectedAppSubmitted += i;
+ }
+ Assert.assertEquals(expectedAppSubmitted, responseGet.getAppsSubmitted());
+ // The merge operations is tested in TestRouterWebServiceUtil
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index 38b1027..e7b28b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -37,9 +37,13 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
@@ -88,9 +92,9 @@ public class TestFederationInterceptorRESTRetry
interceptor.init(user);
// Create SubClusters
- good = SubClusterId.newInstance("0");
- bad1 = SubClusterId.newInstance("1");
- bad2 = SubClusterId.newInstance("2");
+ good = SubClusterId.newInstance("1");
+ bad1 = SubClusterId.newInstance("2");
+ bad2 = SubClusterId.newInstance("3");
scs.add(good);
scs.add(bad1);
scs.add(bad2);
@@ -316,4 +320,201 @@ public class TestFederationInterceptorRESTRetry
Assert.assertEquals(1, response.getApps().size());
}
+ /**
+ * This test validates the correctness of GetNode in case the cluster is
+ * composed of only 1 bad SubCluster.
+ */
+ @Test
+ public void testGetNodeOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+
+ setupCluster(Arrays.asList(bad2));
+ try {
+ interceptor.getNode("testGetNodeOneBadSC");
+ Assert.fail();
+ } catch (NotFoundException e) {
+ Assert.assertTrue(
+ e.getMessage().contains("nodeId, testGetNodeOneBadSC, is not found"));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetNode in case the cluster is
+ * composed of only 2 bad SubClusters.
+ */
+ @Test
+ public void testGetNodeTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ try {
+ interceptor.getNode("testGetNodeTwoBadSCs");
+ Assert.fail();
+ } catch (NotFoundException e) {
+ Assert.assertTrue(e.getMessage()
+ .contains("nodeId, testGetNodeTwoBadSCs, is not found"));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetNode in case the cluster is
+ * composed of only 1 bad SubCluster and a good one.
+ */
+ @Test
+ public void testGetNodeOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(good, bad2));
+
+ NodeInfo response = interceptor.getNode(null);
+ Assert.assertNotNull(response);
+ // Check if the only node came from Good SubCluster
+ Assert.assertEquals(good.getId(),
+ Long.toString(response.getLastHealthUpdate()));
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case the cluster is
+ * composed of only 1 bad SubCluster.
+ */
+ @Test
+ public void testGetNodesOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+
+ setupCluster(Arrays.asList(bad2));
+
+ NodesInfo response = interceptor.getNodes(null);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(0, response.getNodes().size());
+ // The remove duplicate operations is tested in TestRouterWebServiceUtil
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case the cluster is
+ * composed of only 2 bad SubClusters.
+ */
+ @Test
+ public void testGetNodesTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ NodesInfo response = interceptor.getNodes(null);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(0, response.getNodes().size());
+ // The remove duplicate operations is tested in TestRouterWebServiceUtil
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case the cluster is
+ * composed of only 1 bad SubCluster and a good one.
+ */
+ @Test
+ public void testGetNodesOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(good, bad2));
+
+ NodesInfo response = interceptor.getNodes(null);
+ Assert.assertNotNull(response);
+ Assert.assertEquals(1, response.getNodes().size());
+ // Check if the only node came from Good SubCluster
+ Assert.assertEquals(good.getId(),
+ Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
+ // The remove duplicate operations is tested in TestRouterWebServiceUtil
+ }
+
+ /**
+ * This test validates the correctness of GetNodes in case the cluster is
+ * composed of only 1 bad SubCluster. The excepted result would be a
+ * ClusterMetricsInfo with all its values set to 0.
+ */
+ @Test
+ public void testGetClusterMetricsOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(bad2));
+
+ ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+ Assert.assertNotNull(response);
+ // check if we got an empty metrics
+ checkEmptyMetrics(response);
+ }
+
+ /**
+ * This test validates the correctness of GetClusterMetrics in case the
+ * cluster is composed of only 2 bad SubClusters. The excepted result would be
+ * a ClusterMetricsInfo with all its values set to 0.
+ */
+ @Test
+ public void testGetClusterMetricsTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+ Assert.assertNotNull(response);
+ // check if we got an empty metrics
+ Assert.assertEquals(0, response.getAppsSubmitted());
+ }
+
+ /**
+ * This test validates the correctness of GetClusterMetrics in case the
+ * cluster is composed of only 1 bad SubCluster and a good one. The good
+ * SubCluster provided a ClusterMetricsInfo with appsSubmitted set to its
+ * SubClusterId. The expected result would be appSubmitted equals to its
+ * SubClusterId. SubClusterId in this case is an integer.
+ */
+ @Test
+ public void testGetClusterMetricsOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ setupCluster(Arrays.asList(good, bad2));
+
+ ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+ Assert.assertNotNull(response);
+ checkMetricsFromGoodSC(response);
+ // The merge operations is tested in TestRouterWebServiceUtil
+ }
+
+ private void checkMetricsFromGoodSC(ClusterMetricsInfo response) {
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsSubmitted());
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsCompleted());
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsPending());
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsRunning());
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsFailed());
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getAppsKilled());
+ }
+
+ private void checkEmptyMetrics(ClusterMetricsInfo response) {
+ Assert.assertEquals(0, response.getAppsSubmitted());
+ Assert.assertEquals(0, response.getAppsCompleted());
+ Assert.assertEquals(0, response.getAppsPending());
+ Assert.assertEquals(0, response.getAppsRunning());
+ Assert.assertEquals(0, response.getAppsFailed());
+ Assert.assertEquals(0, response.getAppsKilled());
+
+ Assert.assertEquals(0, response.getReservedMB());
+ Assert.assertEquals(0, response.getAvailableMB());
+ Assert.assertEquals(0, response.getAllocatedMB());
+
+ Assert.assertEquals(0, response.getReservedVirtualCores());
+ Assert.assertEquals(0, response.getAvailableVirtualCores());
+ Assert.assertEquals(0, response.getAllocatedVirtualCores());
+
+ Assert.assertEquals(0, response.getContainersAllocated());
+ Assert.assertEquals(0, response.getReservedContainers());
+ Assert.assertEquals(0, response.getPendingContainers());
+
+ Assert.assertEquals(0, response.getTotalMB());
+ Assert.assertEquals(0, response.getTotalVirtualCores());
+ Assert.assertEquals(0, response.getTotalNodes());
+ Assert.assertEquals(0, response.getLostNodes());
+ Assert.assertEquals(0, response.getUnhealthyNodes());
+ Assert.assertEquals(0, response.getDecommissioningNodes());
+ Assert.assertEquals(0, response.getDecommissionedNodes());
+ Assert.assertEquals(0, response.getRebootedNodes());
+ Assert.assertEquals(0, response.getActiveNodes());
+ Assert.assertEquals(0, response.getShutdownNodes());
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
index 810432a..7073b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
@@ -20,11 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
import org.junit.Assert;
@@ -40,6 +44,11 @@ public class TestRouterWebServiceUtil {
private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
+ private static final String NODE1 = "Node1";
+ private static final String NODE2 = "Node2";
+ private static final String NODE3 = "Node3";
+ private static final String NODE4 = "Node4";
+
/**
* This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
* in case we want to merge 4 AMs. The expected result would be the same 4
@@ -308,4 +317,257 @@ public class TestRouterWebServiceUtil {
Assert.assertNotNull(result);
Assert.assertEquals(1, result.getApps().size());
}
+
+ /**
+ * This test validates the correctness of
+ * RouterWebServiceUtil#deleteDuplicateNodesInfo in case we want to merge 4
+ * Nodes. The expected result would be the same 4 Nodes.
+ */
+ @Test
+ public void testDeleteDuplicate4DifferentNodes() {
+
+ NodesInfo nodes = new NodesInfo();
+
+ NodeInfo nodeInfo1 = new NodeInfo();
+ nodeInfo1.setId(NODE1);
+ nodes.add(nodeInfo1);
+
+ NodeInfo nodeInfo2 = new NodeInfo();
+ nodeInfo2.setId(NODE2);
+ nodes.add(nodeInfo2);
+
+ NodeInfo nodeInfo3 = new NodeInfo();
+ nodeInfo3.setId(NODE3);
+ nodes.add(nodeInfo3);
+
+ NodeInfo nodeInfo4 = new NodeInfo();
+ nodeInfo4.setId(NODE4);
+ nodes.add(nodeInfo4);
+
+ NodesInfo result =
+ RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
+ Assert.assertNotNull(result);
+ Assert.assertEquals(4, result.getNodes().size());
+
+ List<String> nodesIds = new ArrayList<String>();
+
+ for (NodeInfo node : result.getNodes()) {
+ nodesIds.add(node.getNodeId());
+ }
+
+ Assert.assertTrue(nodesIds.contains(NODE1));
+ Assert.assertTrue(nodesIds.contains(NODE2));
+ Assert.assertTrue(nodesIds.contains(NODE3));
+ Assert.assertTrue(nodesIds.contains(NODE4));
+ }
+
+ /**
+ * This test validates the correctness of
+ * {@link RouterWebServiceUtil#deleteDuplicateNodesInfo(ArrayList)} in case we
+ * want to merge 3 nodes with the same id. The expected result would be 1 node
+ * report with the newest healthy report.
+ */
+ @Test
+ public void testDeleteDuplicateNodes() {
+
+ NodesInfo nodes = new NodesInfo();
+
+ NodeInfo node1 = new NodeInfo();
+ node1.setId(NODE1);
+ node1.setLastHealthUpdate(0);
+ nodes.add(node1);
+
+ NodeInfo node2 = new NodeInfo();
+ node2.setId(NODE1);
+ node2.setLastHealthUpdate(1);
+ nodes.add(node2);
+
+ NodeInfo node3 = new NodeInfo();
+ node3.setId(NODE1);
+ node3.setLastHealthUpdate(2);
+ nodes.add(node3);
+
+ NodesInfo result =
+ RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
+ Assert.assertNotNull(result);
+ Assert.assertEquals(1, result.getNodes().size());
+
+ NodeInfo node = result.getNodes().get(0);
+
+ Assert.assertEquals(NODE1, node.getNodeId());
+ Assert.assertEquals(2, node.getLastHealthUpdate());
+ }
+
+ /**
+ * This test validates the correctness of
+ * {@link RouterWebServiceUtil#mergeMetrics}.
+ */
+ @Test
+ public void testMergeMetrics() {
+ ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+ ClusterMetricsInfo metricsResponse = new ClusterMetricsInfo();
+
+ setUpClusterMetrics(metrics);
+ setUpClusterMetrics(metricsResponse);
+ ClusterMetricsInfo metricsClone = createClusterMetricsClone(metrics);
+ RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
+
+ Assert.assertEquals(
+ metricsResponse.getAppsSubmitted() + metricsClone.getAppsSubmitted(),
+ metrics.getAppsSubmitted());
+ Assert.assertEquals(
+ metricsResponse.getAppsCompleted() + metricsClone.getAppsCompleted(),
+ metrics.getAppsCompleted());
+ Assert.assertEquals(
+ metricsResponse.getAppsPending() + metricsClone.getAppsPending(),
+ metrics.getAppsPending());
+ Assert.assertEquals(
+ metricsResponse.getAppsRunning() + metricsClone.getAppsRunning(),
+ metrics.getAppsRunning());
+ Assert.assertEquals(
+ metricsResponse.getAppsFailed() + metricsClone.getAppsFailed(),
+ metrics.getAppsFailed());
+ Assert.assertEquals(
+ metricsResponse.getAppsKilled() + metricsClone.getAppsKilled(),
+ metrics.getAppsKilled());
+
+ Assert.assertEquals(
+ metricsResponse.getReservedMB() + metricsClone.getReservedMB(),
+ metrics.getReservedMB());
+ Assert.assertEquals(
+ metricsResponse.getAvailableMB() + metricsClone.getAvailableMB(),
+ metrics.getAvailableMB());
+ Assert.assertEquals(
+ metricsResponse.getAllocatedMB() + metricsClone.getAllocatedMB(),
+ metrics.getAllocatedMB());
+
+ Assert.assertEquals(
+ metricsResponse.getReservedVirtualCores()
+ + metricsClone.getReservedVirtualCores(),
+ metrics.getReservedVirtualCores());
+ Assert.assertEquals(
+ metricsResponse.getAvailableVirtualCores()
+ + metricsClone.getAvailableVirtualCores(),
+ metrics.getAvailableVirtualCores());
+ Assert.assertEquals(
+ metricsResponse.getAllocatedVirtualCores()
+ + metricsClone.getAllocatedVirtualCores(),
+ metrics.getAllocatedVirtualCores());
+
+ Assert.assertEquals(
+ metricsResponse.getContainersAllocated()
+ + metricsClone.getContainersAllocated(),
+ metrics.getContainersAllocated());
+ Assert.assertEquals(
+ metricsResponse.getReservedContainers()
+ + metricsClone.getReservedContainers(),
+ metrics.getReservedContainers());
+ Assert.assertEquals(
+ metricsResponse.getPendingContainers()
+ + metricsClone.getPendingContainers(),
+ metrics.getPendingContainers());
+
+ Assert.assertEquals(
+ metricsResponse.getTotalMB() + metricsClone.getTotalMB(),
+ metrics.getTotalMB());
+ Assert.assertEquals(
+ metricsResponse.getTotalVirtualCores()
+ + metricsClone.getTotalVirtualCores(),
+ metrics.getTotalVirtualCores());
+ Assert.assertEquals(
+ metricsResponse.getTotalNodes() + metricsClone.getTotalNodes(),
+ metrics.getTotalNodes());
+ Assert.assertEquals(
+ metricsResponse.getLostNodes() + metricsClone.getLostNodes(),
+ metrics.getLostNodes());
+ Assert.assertEquals(
+ metricsResponse.getUnhealthyNodes() + metricsClone.getUnhealthyNodes(),
+ metrics.getUnhealthyNodes());
+ Assert.assertEquals(
+ metricsResponse.getDecommissioningNodes()
+ + metricsClone.getDecommissioningNodes(),
+ metrics.getDecommissioningNodes());
+ Assert.assertEquals(
+ metricsResponse.getDecommissionedNodes()
+ + metricsClone.getDecommissionedNodes(),
+ metrics.getDecommissionedNodes());
+ Assert.assertEquals(
+ metricsResponse.getRebootedNodes() + metricsClone.getRebootedNodes(),
+ metrics.getRebootedNodes());
+ Assert.assertEquals(
+ metricsResponse.getActiveNodes() + metricsClone.getActiveNodes(),
+ metrics.getActiveNodes());
+ Assert.assertEquals(
+ metricsResponse.getShutdownNodes() + metricsClone.getShutdownNodes(),
+ metrics.getShutdownNodes());
+ }
+
+ private ClusterMetricsInfo createClusterMetricsClone(
+ ClusterMetricsInfo metrics) {
+ ClusterMetricsInfo metricsClone = new ClusterMetricsInfo();
+ metricsClone.setAppsSubmitted(metrics.getAppsSubmitted());
+ metricsClone.setAppsCompleted(metrics.getAppsCompleted());
+ metricsClone.setAppsPending(metrics.getAppsPending());
+ metricsClone.setAppsRunning(metrics.getAppsRunning());
+ metricsClone.setAppsFailed(metrics.getAppsFailed());
+ metricsClone.setAppsKilled(metrics.getAppsKilled());
+
+ metricsClone.setReservedMB(metrics.getReservedMB());
+ metricsClone.setAvailableMB(metrics.getAvailableMB());
+ metricsClone.setAllocatedMB(metrics.getAllocatedMB());
+
+ metricsClone.setReservedVirtualCores(metrics.getReservedVirtualCores());
+ metricsClone.setAvailableVirtualCores(metrics.getAvailableVirtualCores());
+ metricsClone.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores());
+
+ metricsClone.setContainersAllocated(metrics.getContainersAllocated());
+ metricsClone.setContainersReserved(metrics.getReservedContainers());
+ metricsClone.setContainersPending(metrics.getPendingContainers());
+
+ metricsClone.setTotalMB(metrics.getTotalMB());
+ metricsClone.setTotalVirtualCores(metrics.getTotalVirtualCores());
+ metricsClone.setTotalNodes(metrics.getTotalNodes());
+ metricsClone.setLostNodes(metrics.getLostNodes());
+ metricsClone.setUnhealthyNodes(metrics.getUnhealthyNodes());
+ metricsClone.setDecommissioningNodes(metrics.getDecommissioningNodes());
+ metricsClone.setDecommissionedNodes(metrics.getDecommissionedNodes());
+ metricsClone.setRebootedNodes(metrics.getRebootedNodes());
+ metricsClone.setActiveNodes(metrics.getActiveNodes());
+ metricsClone.setShutdownNodes(metrics.getShutdownNodes());
+ return metricsClone;
+
+ }
+
+ private void setUpClusterMetrics(ClusterMetricsInfo metrics) {
+ Random rand = new Random(System.currentTimeMillis());
+ metrics.setAppsSubmitted(rand.nextInt(1000));
+ metrics.setAppsCompleted(rand.nextInt(1000));
+ metrics.setAppsPending(rand.nextInt(1000));
+ metrics.setAppsRunning(rand.nextInt(1000));
+ metrics.setAppsFailed(rand.nextInt(1000));
+ metrics.setAppsKilled(rand.nextInt(1000));
+
+ metrics.setReservedMB(rand.nextInt(1000));
+ metrics.setAvailableMB(rand.nextInt(1000));
+ metrics.setAllocatedMB(rand.nextInt(1000));
+
+ metrics.setReservedVirtualCores(rand.nextInt(1000));
+ metrics.setAvailableVirtualCores(rand.nextInt(1000));
+ metrics.setAllocatedVirtualCores(rand.nextInt(1000));
+
+ metrics.setContainersAllocated(rand.nextInt(1000));
+ metrics.setContainersReserved(rand.nextInt(1000));
+ metrics.setContainersPending(rand.nextInt(1000));
+
+ metrics.setTotalMB(rand.nextInt(1000));
+ metrics.setTotalVirtualCores(rand.nextInt(1000));
+ metrics.setTotalNodes(rand.nextInt(1000));
+ metrics.setLostNodes(rand.nextInt(1000));
+ metrics.setUnhealthyNodes(rand.nextInt(1000));
+ metrics.setDecommissioningNodes(rand.nextInt(1000));
+ metrics.setDecommissionedNodes(rand.nextInt(1000));
+ metrics.setRebootedNodes(rand.nextInt(1000));
+ metrics.setActiveNodes(rand.nextInt(1000));
+ metrics.setShutdownNodes(rand.nextInt(1000));
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org