You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/08/31 22:05:53 UTC

hadoop git commit: YARN-7095. Federation: routing getNode/getNodes/getMetrics REST invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).

Repository: hadoop
Updated Branches:
  refs/heads/trunk d4417dae4 -> bac4e8cca


YARN-7095. Federation: routing getNode/getNodes/getMetrics REST invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bac4e8cc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bac4e8cc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bac4e8cc

Branch: refs/heads/trunk
Commit: bac4e8cca8b54405f5e37b90e545b93bbadee0f4
Parents: d4417da
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Aug 31 15:05:41 2017 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Thu Aug 31 15:05:41 2017 -0700

----------------------------------------------------------------------
 .../webapp/dao/ClusterMetricsInfo.java          | 162 +++++++++---
 .../resourcemanager/webapp/dao/NodeInfo.java    |  17 +-
 .../resourcemanager/webapp/dao/NodesInfo.java   |   4 +
 .../webapp/FederationInterceptorREST.java       | 234 ++++++++++++++++-
 .../router/webapp/RouterWebServiceUtil.java     | 101 ++++++-
 .../MockDefaultRequestInterceptorREST.java      |  43 +++
 .../webapp/TestFederationInterceptorREST.java   |  54 +++-
 .../TestFederationInterceptorRESTRetry.java     | 207 ++++++++++++++-
 .../router/webapp/TestRouterWebServiceUtil.java | 262 +++++++++++++++++++
 9 files changed, 1030 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
index dc42eb6..3214cb9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/ClusterMetricsInfo.java
@@ -31,35 +31,35 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 @XmlAccessorType(XmlAccessType.FIELD)
 public class ClusterMetricsInfo {
 
-  protected int appsSubmitted;
-  protected int appsCompleted;
-  protected int appsPending;
-  protected int appsRunning;
-  protected int appsFailed;
-  protected int appsKilled;
-
-  protected long reservedMB;
-  protected long availableMB;
-  protected long allocatedMB;
-
-  protected long reservedVirtualCores;
-  protected long availableVirtualCores;
-  protected long allocatedVirtualCores;
-
-  protected int containersAllocated;
-  protected int containersReserved;
-  protected int containersPending;
-
-  protected long totalMB;
-  protected long totalVirtualCores;
-  protected int totalNodes;
-  protected int lostNodes;
-  protected int unhealthyNodes;
-  protected int decommissioningNodes;
-  protected int decommissionedNodes;
-  protected int rebootedNodes;
-  protected int activeNodes;
-  protected int shutdownNodes;
+  private int appsSubmitted;
+  private int appsCompleted;
+  private int appsPending;
+  private int appsRunning;
+  private int appsFailed;
+  private int appsKilled;
+
+  private long reservedMB;
+  private long availableMB;
+  private long allocatedMB;
+
+  private long reservedVirtualCores;
+  private long availableVirtualCores;
+  private long allocatedVirtualCores;
+
+  private int containersAllocated;
+  private int containersReserved;
+  private int containersPending;
+
+  private long totalMB;
+  private long totalVirtualCores;
+  private int totalNodes;
+  private int lostNodes;
+  private int unhealthyNodes;
+  private int decommissioningNodes;
+  private int decommissionedNodes;
+  private int rebootedNodes;
+  private int activeNodes;
+  private int shutdownNodes;
 
   public ClusterMetricsInfo() {
   } // JAXB needs this
@@ -93,8 +93,8 @@ public class ClusterMetricsInfo {
 
     if (rs instanceof CapacityScheduler) {
       this.totalMB = availableMB + allocatedMB + reservedMB;
-      this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores
-          + containersReserved;
+      this.totalVirtualCores =
+          availableVirtualCores + allocatedVirtualCores + containersReserved;
     } else {
       this.totalMB = availableMB + allocatedMB;
       this.totalVirtualCores = availableVirtualCores + allocatedVirtualCores;
@@ -210,4 +210,104 @@ public class ClusterMetricsInfo {
     return this.shutdownNodes;
   }
 
+  public void setContainersReserved(int containersReserved) {
+    this.containersReserved = containersReserved;
+  }
+
+  public void setContainersPending(int containersPending) {
+    this.containersPending = containersPending;
+  }
+
+  public void setAppsSubmitted(int appsSubmitted) {
+    this.appsSubmitted = appsSubmitted;
+  }
+
+  public void setAppsCompleted(int appsCompleted) {
+    this.appsCompleted = appsCompleted;
+  }
+
+  public void setAppsPending(int appsPending) {
+    this.appsPending = appsPending;
+  }
+
+  public void setAppsRunning(int appsRunning) {
+    this.appsRunning = appsRunning;
+  }
+
+  public void setAppsFailed(int appsFailed) {
+    this.appsFailed = appsFailed;
+  }
+
+  public void setAppsKilled(int appsKilled) {
+    this.appsKilled = appsKilled;
+  }
+
+  public void setReservedMB(long reservedMB) {
+    this.reservedMB = reservedMB;
+  }
+
+  public void setAvailableMB(long availableMB) {
+    this.availableMB = availableMB;
+  }
+
+  public void setAllocatedMB(long allocatedMB) {
+    this.allocatedMB = allocatedMB;
+  }
+
+  public void setReservedVirtualCores(long reservedVirtualCores) {
+    this.reservedVirtualCores = reservedVirtualCores;
+  }
+
+  public void setAvailableVirtualCores(long availableVirtualCores) {
+    this.availableVirtualCores = availableVirtualCores;
+  }
+
+  public void setAllocatedVirtualCores(long allocatedVirtualCores) {
+    this.allocatedVirtualCores = allocatedVirtualCores;
+  }
+
+  public void setContainersAllocated(int containersAllocated) {
+    this.containersAllocated = containersAllocated;
+  }
+
+  public void setTotalMB(long totalMB) {
+    this.totalMB = totalMB;
+  }
+
+  public void setTotalVirtualCores(long totalVirtualCores) {
+    this.totalVirtualCores = totalVirtualCores;
+  }
+
+  public void setTotalNodes(int totalNodes) {
+    this.totalNodes = totalNodes;
+  }
+
+  public void setLostNodes(int lostNodes) {
+    this.lostNodes = lostNodes;
+  }
+
+  public void setUnhealthyNodes(int unhealthyNodes) {
+    this.unhealthyNodes = unhealthyNodes;
+  }
+
+  public void setDecommissioningNodes(int decommissioningNodes) {
+    this.decommissioningNodes = decommissioningNodes;
+  }
+
+  public void setDecommissionedNodes(int decommissionedNodes) {
+    this.decommissionedNodes = decommissionedNodes;
+  }
+
+  public void setRebootedNodes(int rebootedNodes) {
+    this.rebootedNodes = rebootedNodes;
+  }
+
+  public void setActiveNodes(int activeNodes) {
+    this.activeNodes = activeNodes;
+  }
+
+  public void setShutdownNodes(int shutdownNodes) {
+    this.shutdownNodes = shutdownNodes;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
index 3416e52..2530c8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodeInfo.java
@@ -33,16 +33,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 
+import com.google.common.annotations.VisibleForTesting;
+
 @XmlRootElement(name = "node")
 @XmlAccessorType(XmlAccessType.FIELD)
 public class NodeInfo {
 
   protected String rack;
   protected NodeState state;
-  protected String id;
+  private String id;
   protected String nodeHostName;
   protected String nodeHTTPAddress;
-  protected long lastHealthUpdate;
+  private long lastHealthUpdate;
   protected String version;
   protected String healthReport;
   protected int numContainers;
@@ -184,4 +186,15 @@ public class NodeInfo {
   public ResourceUtilizationInfo getResourceUtilization() {
     return this.resourceUtilization;
   }
+
+  @VisibleForTesting
+  public void setId(String id) {
+    this.id = id;
+  }
+
+  @VisibleForTesting
+  public void setLastHealthUpdate(long lastHealthUpdate) {
+    this.lastHealthUpdate = lastHealthUpdate;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
index 7dacd10..8174be0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/NodesInfo.java
@@ -39,4 +39,8 @@ public class NodesInfo {
   public ArrayList<NodeInfo> getNodes() {
     return node;
   }
+
+  public void addAll(ArrayList<NodeInfo> nodesInfo) {
+    node.addAll(nodesInfo);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 3a91e35..bfd35c5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -28,7 +28,6 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import javax.servlet.http.HttpServletRequest;
@@ -40,6 +39,7 @@ import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -85,10 +85,12 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.MonotonicClock;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@@ -136,7 +138,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
     routerMetrics = RouterMetrics.getMetrics();
-    threadpool = Executors.newCachedThreadPool();
+    threadpool = HadoopExecutors.newCachedThreadPool(new ThreadFactoryBuilder()
+        .setNameFormat("FederationInterceptorREST #%d").build());
 
     returnPartialReport =
         conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
@@ -695,39 +698,237 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         returnPartialReport);
   }
 
+  /**
+   * The Yarn Router will forward to the request to all the SubClusters to find
+   * where the node is running.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
   @Override
-  public ClusterInfo get() {
-    return getClusterInfo();
+  public NodeInfo getNode(String nodeId) {
+    Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      throw new NotFoundException(e.getMessage());
+    }
+
+    if (subClustersActive.isEmpty()) {
+      throw new NotFoundException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+    }
+
+    // Send the requests in parallel
+
+    ExecutorCompletionService<NodeInfo> compSvc =
+        new ExecutorCompletionService<NodeInfo>(this.threadpool);
+
+    for (final SubClusterInfo info : subClustersActive.values()) {
+      compSvc.submit(new Callable<NodeInfo>() {
+        @Override
+        public NodeInfo call() {
+          DefaultRequestInterceptorREST interceptor =
+              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+                  info.getClientRMServiceAddress());
+          try {
+            NodeInfo nodeInfo = interceptor.getNode(nodeId);
+            return nodeInfo;
+          } catch (Exception e) {
+            LOG.error("Subcluster " + info.getSubClusterId()
+                + " failed to return nodeInfo.");
+            return null;
+          }
+        }
+      });
+    }
+
+    // Collect all the responses in parallel
+    NodeInfo nodeInfo = null;
+    for (int i = 0; i < subClustersActive.values().size(); i++) {
+      try {
+        Future<NodeInfo> future = compSvc.take();
+        NodeInfo nodeResponse = future.get();
+
+        // Check if the node was found in this SubCluster
+        if (nodeResponse != null) {
+          // Check if the node was already found in a different SubCluster and
+          // it has an old health report
+          if (nodeInfo == null || nodeInfo.getLastHealthUpdate() < nodeResponse
+              .getLastHealthUpdate()) {
+            nodeInfo = nodeResponse;
+          }
+        }
+      } catch (Throwable e) {
+        LOG.warn("Failed to get node report ", e);
+      }
+    }
+    if (nodeInfo == null) {
+      throw new NotFoundException("nodeId, " + nodeId + ", is not found");
+    }
+    return nodeInfo;
   }
 
+  /**
+   * The Yarn Router will forward the request to all the Yarn RMs in parallel,
+   * after that it will remove all the duplicated NodeInfo by using the NodeId.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router calls each Yarn RM in parallel by using one
+   * thread for each Yarn RM. In case a Yarn RM fails, a single call will
+   * timeout. However the Router will use the NodesInfo it got, and provides a
+   * partial list to the client.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
   @Override
-  public ClusterInfo getClusterInfo() {
-    throw new NotImplementedException();
+  public NodesInfo getNodes(String states) {
+
+    NodesInfo nodes = new NodesInfo();
+
+    Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      return new NodesInfo();
+    }
+
+    // Send the requests in parallel
+
+    ExecutorCompletionService<NodesInfo> compSvc =
+        new ExecutorCompletionService<NodesInfo>(this.threadpool);
+
+    for (final SubClusterInfo info : subClustersActive.values()) {
+      compSvc.submit(new Callable<NodesInfo>() {
+        @Override
+        public NodesInfo call() {
+          DefaultRequestInterceptorREST interceptor =
+              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+                  info.getClientRMServiceAddress());
+          try {
+            NodesInfo nodesInfo = interceptor.getNodes(states);
+            return nodesInfo;
+          } catch (Exception e) {
+            LOG.error("Subcluster " + info.getSubClusterId()
+                + " failed to return nodesInfo.");
+            return null;
+          }
+        }
+      });
+    }
+
+    // Collect all the responses in parallel
+
+    for (int i = 0; i < subClustersActive.values().size(); i++) {
+      try {
+        Future<NodesInfo> future = compSvc.take();
+        NodesInfo nodesResponse = future.get();
+
+        if (nodesResponse != null) {
+          nodes.addAll(nodesResponse.getNodes());
+        }
+      } catch (Throwable e) {
+        LOG.warn("Failed to get nodes report ", e);
+      }
+    }
+
+    // Delete duplicate from all the node reports got from all the available
+    // Yarn RMs. Nodes can be moved from one subclusters to another. In this
+    // operation they result LOST/RUNNING in the previous SubCluster and
+    // NEW/RUNNING in the new one.
+
+    return RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
   }
 
   @Override
   public ClusterMetricsInfo getClusterMetricsInfo() {
-    throw new NotImplementedException();
+    ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+
+    Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      LOG.error(e.getLocalizedMessage());
+      return metrics;
+    }
+
+    // Send the requests in parallel
+
+    ExecutorCompletionService<ClusterMetricsInfo> compSvc =
+        new ExecutorCompletionService<ClusterMetricsInfo>(this.threadpool);
+
+    for (final SubClusterInfo info : subClustersActive.values()) {
+      compSvc.submit(new Callable<ClusterMetricsInfo>() {
+        @Override
+        public ClusterMetricsInfo call() {
+          DefaultRequestInterceptorREST interceptor =
+              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+                  info.getClientRMServiceAddress());
+          try {
+            ClusterMetricsInfo metrics = interceptor.getClusterMetricsInfo();
+            return metrics;
+          } catch (Exception e) {
+            LOG.error("Subcluster " + info.getSubClusterId()
+                + " failed to return Cluster Metrics.");
+            return null;
+          }
+        }
+      });
+    }
+
+    // Collect all the responses in parallel
+
+    for (int i = 0; i < subClustersActive.values().size(); i++) {
+      try {
+        Future<ClusterMetricsInfo> future = compSvc.take();
+        ClusterMetricsInfo metricsResponse = future.get();
+
+        if (metricsResponse != null) {
+          RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
+        }
+      } catch (Throwable e) {
+        LOG.warn("Failed to get nodes report ", e);
+      }
+    }
+
+    return metrics;
   }
 
   @Override
-  public SchedulerTypeInfo getSchedulerInfo() {
-    throw new NotImplementedException();
+  public ClusterInfo get() {
+    return getClusterInfo();
   }
 
   @Override
-  public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
-      throws IOException {
+  public ClusterInfo getClusterInfo() {
     throw new NotImplementedException();
   }
 
   @Override
-  public NodesInfo getNodes(String states) {
+  public SchedulerTypeInfo getSchedulerInfo() {
     throw new NotImplementedException();
   }
 
   @Override
-  public NodeInfo getNode(String nodeId) {
+  public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
+      throws IOException {
     throw new NotImplementedException();
   }
 
@@ -933,4 +1134,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
         + "in the chain. Check if the interceptor pipeline configuration "
         + "is correct");
   }
+
+  @Override
+  public void shutdown() {
+    if (threadpool != null) {
+      threadpool.shutdown();
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
index e633b6a..e769a86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -39,6 +40,9 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
@@ -233,10 +237,10 @@ public final class RouterWebServiceUtil {
   }
 
   /**
-   * Merges a list of AppInfo grouping by ApplicationId. Our current policy
-   * is to merge the application reports from the reacheable SubClusters.
-   * Via configuration parameter, we decide whether to return applications
-   * for which the primary AM is missing or to omit them.
+   * Merges a list of AppInfo grouping by ApplicationId. Our current policy is
+   * to merge the application reports from the reacheable SubClusters. Via
+   * configuration parameter, we decide whether to return applications for which
+   * the primary AM is missing or to omit them.
    *
    * @param appsInfo a list of AppInfo to merge
    * @param returnPartialResult if the merge AppsInfo should contain partial
@@ -331,4 +335,93 @@ public final class RouterWebServiceUtil {
       am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
     }
   }
+
+  /**
+   * Deletes all the duplicate NodeInfo by discarding the old instances.
+   *
+   * @param nodes a list of NodeInfo to check for duplicates
+   * @return a NodesInfo that contains a list of NodeInfos without duplicates
+   */
+  public static NodesInfo deleteDuplicateNodesInfo(ArrayList<NodeInfo> nodes) {
+    NodesInfo nodesInfo = new NodesInfo();
+
+    Map<String, NodeInfo> nodesMap = new LinkedHashMap<>();
+    for (NodeInfo node : nodes) {
+      String nodeId = node.getNodeId();
+      // If the node already exists, it could be an old instance
+      if (nodesMap.containsKey(nodeId)) {
+        // Check if the node is an old instance
+        if (nodesMap.get(nodeId).getLastHealthUpdate() < node
+            .getLastHealthUpdate()) {
+          nodesMap.put(node.getNodeId(), node);
+        }
+      } else {
+        nodesMap.put(node.getNodeId(), node);
+      }
+    }
+    nodesInfo.addAll(new ArrayList<NodeInfo>(nodesMap.values()));
+    return nodesInfo;
+  }
+
+  /**
+   * Adds all the values from the second ClusterMetricsInfo to the first one.
+   *
+   * @param metrics the ClusterMetricsInfo we want to update
+   * @param metricsResponse the ClusterMetricsInfo we want to add to the first
+   *          param
+   */
+  public static void mergeMetrics(ClusterMetricsInfo metrics,
+      ClusterMetricsInfo metricsResponse) {
+    metrics.setAppsSubmitted(
+        metrics.getAppsSubmitted() + metricsResponse.getAppsSubmitted());
+    metrics.setAppsCompleted(
+        metrics.getAppsCompleted() + metricsResponse.getAppsCompleted());
+    metrics.setAppsPending(
+        metrics.getAppsPending() + metricsResponse.getAppsPending());
+    metrics.setAppsRunning(
+        metrics.getAppsRunning() + metricsResponse.getAppsRunning());
+    metrics.setAppsFailed(
+        metrics.getAppsFailed() + metricsResponse.getAppsFailed());
+    metrics.setAppsKilled(
+        metrics.getAppsKilled() + metricsResponse.getAppsKilled());
+
+    metrics.setReservedMB(
+        metrics.getReservedMB() + metricsResponse.getReservedMB());
+    metrics.setAvailableMB(
+        metrics.getAvailableMB() + metricsResponse.getAvailableMB());
+    metrics.setAllocatedMB(
+        metrics.getAllocatedMB() + metricsResponse.getAllocatedMB());
+
+    metrics.setReservedVirtualCores(metrics.getReservedVirtualCores()
+        + metricsResponse.getReservedVirtualCores());
+    metrics.setAvailableVirtualCores(metrics.getAvailableVirtualCores()
+        + metricsResponse.getAvailableVirtualCores());
+    metrics.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores()
+        + metricsResponse.getAllocatedVirtualCores());
+
+    metrics.setContainersAllocated(metrics.getContainersAllocated()
+        + metricsResponse.getContainersAllocated());
+    metrics.setContainersReserved(metrics.getReservedContainers()
+        + metricsResponse.getReservedContainers());
+    metrics.setContainersPending(metrics.getPendingContainers()
+        + metricsResponse.getPendingContainers());
+
+    metrics.setTotalMB(metrics.getTotalMB() + metricsResponse.getTotalMB());
+    metrics.setTotalVirtualCores(
+        metrics.getTotalVirtualCores() + metrics.getTotalVirtualCores());
+    metrics.setTotalNodes(metrics.getTotalNodes() + metrics.getTotalNodes());
+    metrics.setLostNodes(metrics.getLostNodes() + metrics.getLostNodes());
+    metrics.setUnhealthyNodes(
+        metrics.getUnhealthyNodes() + metrics.getUnhealthyNodes());
+    metrics.setDecommissioningNodes(
+        metrics.getDecommissioningNodes() + metrics.getDecommissioningNodes());
+    metrics.setDecommissionedNodes(
+        metrics.getDecommissionedNodes() + metrics.getDecommissionedNodes());
+    metrics.setRebootedNodes(
+        metrics.getRebootedNodes() + metrics.getRebootedNodes());
+    metrics.setActiveNodes(metrics.getActiveNodes() + metrics.getActiveNodes());
+    metrics.setShutdownNodes(
+        metrics.getShutdownNodes() + metrics.getShutdownNodes());
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 93527e5..6afecae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -38,7 +38,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
@@ -149,6 +152,46 @@ public class MockDefaultRequestInterceptorREST
     return Response.status(Status.OK).entity(ret).build();
   }
 
+  @Override
+  public NodeInfo getNode(String nodeId) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+    NodeInfo node = new NodeInfo();
+    node.setId(nodeId);
+    node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+    return node;
+  }
+
+  @Override
+  public NodesInfo getNodes(String states) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+    NodeInfo node = new NodeInfo();
+    node.setId("Node " + Integer.valueOf(getSubClusterId().getId()));
+    node.setLastHealthUpdate(Integer.valueOf(getSubClusterId().getId()));
+    NodesInfo nodes = new NodesInfo();
+    nodes.add(node);
+    return nodes;
+  }
+
+  @Override
+  public ClusterMetricsInfo getClusterMetricsInfo() {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+    ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+    metrics.setAppsSubmitted(Integer.valueOf(getSubClusterId().getId()));
+    metrics.setAppsCompleted(Integer.valueOf(getSubClusterId().getId()));
+    metrics.setAppsPending(Integer.valueOf(getSubClusterId().getId()));
+    metrics.setAppsRunning(Integer.valueOf(getSubClusterId().getId()));
+    metrics.setAppsFailed(Integer.valueOf(getSubClusterId().getId()));
+    metrics.setAppsKilled(Integer.valueOf(getSubClusterId().getId()));
+
+    return metrics;
+  }
+
   public void setSubClusterId(int subClusterId) {
     setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index 2ee62af..fae4ecf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -36,7 +36,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -388,7 +391,56 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
 
     Assert.assertNotNull(responseGet);
     Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
-    // The merged operations will be tested in TestRouterWebServiceUtil
+    // The merged operations is tested in TestRouterWebServiceUtil
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case each subcluster
+   * provided one node with the LastHealthUpdate set to the SubClusterId. The
+   * expected result would be the NodeInfo from the last SubCluster that has
+   * LastHealthUpdate equal to Num_SubCluster -1.
+   */
+  @Test
+  public void testGetNode() {
+
+    NodeInfo responseGet = interceptor.getNode("testGetNode");
+
+    Assert.assertNotNull(responseGet);
+    Assert.assertEquals(NUM_SUBCLUSTER - 1, responseGet.getLastHealthUpdate());
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case each subcluster
+   * provided one node.
+   */
+  @Test
+  public void testGetNodes() {
+
+    NodesInfo responseGet = interceptor.getNodes(null);
+
+    Assert.assertNotNull(responseGet);
+    Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getNodes().size());
+    // The remove duplicate operations is tested in TestRouterWebServiceUtil
+  }
+
+  /**
+   * This test validates the correctness of getClusterMetricsInfo in case each
+   * SubCluster provided a ClusterMetricsInfo with appsSubmitted set to the
+   * SubClusterId. The expected result would be appSubmitted equals to the sum
+   * of SubClusterId. SubClusterId in this case is an integer.
+   */
+  @Test
+  public void testGetClusterMetrics() {
+
+    ClusterMetricsInfo responseGet = interceptor.getClusterMetricsInfo();
+
+    Assert.assertNotNull(responseGet);
+    int expectedAppSubmitted = 0;
+    for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+      expectedAppSubmitted += i;
+    }
+    Assert.assertEquals(expectedAppSubmitted, responseGet.getAppsSubmitted());
+    // The merge operations is tested in TestRouterWebServiceUtil
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index 38b1027..e7b28b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -37,9 +37,13 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
 import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -88,9 +92,9 @@ public class TestFederationInterceptorRESTRetry
     interceptor.init(user);
 
     // Create SubClusters
-    good = SubClusterId.newInstance("0");
-    bad1 = SubClusterId.newInstance("1");
-    bad2 = SubClusterId.newInstance("2");
+    good = SubClusterId.newInstance("1");
+    bad1 = SubClusterId.newInstance("2");
+    bad2 = SubClusterId.newInstance("3");
     scs.add(good);
     scs.add(bad1);
     scs.add(bad2);
@@ -316,4 +320,201 @@ public class TestFederationInterceptorRESTRetry
     Assert.assertEquals(1, response.getApps().size());
   }
 
+  /**
+   * This test validates the correctness of GetNode in case the cluster is
+   * composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetNodeOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+    try {
+      interceptor.getNode("testGetNodeOneBadSC");
+      Assert.fail();
+    } catch (NotFoundException e) {
+      Assert.assertTrue(
+          e.getMessage().contains("nodeId, testGetNodeOneBadSC, is not found"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetNode in case the cluster is
+   * composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetNodeTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    try {
+      interceptor.getNode("testGetNodeTwoBadSCs");
+      Assert.fail();
+    } catch (NotFoundException e) {
+      Assert.assertTrue(e.getMessage()
+          .contains("nodeId, testGetNodeTwoBadSCs, is not found"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetNode in case the cluster is
+   * composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testGetNodeOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(good, bad2));
+
+    NodeInfo response = interceptor.getNode(null);
+    Assert.assertNotNull(response);
+    // Check if the only node came from Good SubCluster
+    Assert.assertEquals(good.getId(),
+        Long.toString(response.getLastHealthUpdate()));
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case the cluster is
+   * composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetNodesOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    NodesInfo response = interceptor.getNodes(null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(0, response.getNodes().size());
+    // The remove duplicate operations is tested in TestRouterWebServiceUtil
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case the cluster is
+   * composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetNodesTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    NodesInfo response = interceptor.getNodes(null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(0, response.getNodes().size());
+    // The remove duplicate operations is tested in TestRouterWebServiceUtil
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case the cluster is
+   * composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testGetNodesOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(good, bad2));
+
+    NodesInfo response = interceptor.getNodes(null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(1, response.getNodes().size());
+    // Check if the only node came from Good SubCluster
+    Assert.assertEquals(good.getId(),
+        Long.toString(response.getNodes().get(0).getLastHealthUpdate()));
+    // The remove duplicate operations is tested in TestRouterWebServiceUtil
+  }
+
+  /**
+   * This test validates the correctness of GetNodes in case the cluster is
+   * composed of only 1 bad SubCluster. The excepted result would be a
+   * ClusterMetricsInfo with all its values set to 0.
+   */
+  @Test
+  public void testGetClusterMetricsOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad2));
+
+    ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+    Assert.assertNotNull(response);
+    // check if we got an empty metrics
+    checkEmptyMetrics(response);
+  }
+
+  /**
+   * This test validates the correctness of GetClusterMetrics in case the
+   * cluster is composed of only 2 bad SubClusters. The excepted result would be
+   * a ClusterMetricsInfo with all its values set to 0.
+   */
+  @Test
+  public void testGetClusterMetricsTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+    Assert.assertNotNull(response);
+    // check if we got an empty metrics
+    Assert.assertEquals(0, response.getAppsSubmitted());
+  }
+
+  /**
+   * This test validates the correctness of GetClusterMetrics in case the
+   * cluster is composed of only 1 bad SubCluster and a good one. The good
+   * SubCluster provided a ClusterMetricsInfo with appsSubmitted set to its
+   * SubClusterId. The expected result would be appSubmitted equals to its
+   * SubClusterId. SubClusterId in this case is an integer.
+   */
+  @Test
+  public void testGetClusterMetricsOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(good, bad2));
+
+    ClusterMetricsInfo response = interceptor.getClusterMetricsInfo();
+    Assert.assertNotNull(response);
+    checkMetricsFromGoodSC(response);
+    // The merge operations is tested in TestRouterWebServiceUtil
+  }
+
+  private void checkMetricsFromGoodSC(ClusterMetricsInfo response) {
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsSubmitted());
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsCompleted());
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsPending());
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsRunning());
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsFailed());
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getAppsKilled());
+  }
+
+  private void checkEmptyMetrics(ClusterMetricsInfo response) {
+    Assert.assertEquals(0, response.getAppsSubmitted());
+    Assert.assertEquals(0, response.getAppsCompleted());
+    Assert.assertEquals(0, response.getAppsPending());
+    Assert.assertEquals(0, response.getAppsRunning());
+    Assert.assertEquals(0, response.getAppsFailed());
+    Assert.assertEquals(0, response.getAppsKilled());
+
+    Assert.assertEquals(0, response.getReservedMB());
+    Assert.assertEquals(0, response.getAvailableMB());
+    Assert.assertEquals(0, response.getAllocatedMB());
+
+    Assert.assertEquals(0, response.getReservedVirtualCores());
+    Assert.assertEquals(0, response.getAvailableVirtualCores());
+    Assert.assertEquals(0, response.getAllocatedVirtualCores());
+
+    Assert.assertEquals(0, response.getContainersAllocated());
+    Assert.assertEquals(0, response.getReservedContainers());
+    Assert.assertEquals(0, response.getPendingContainers());
+
+    Assert.assertEquals(0, response.getTotalMB());
+    Assert.assertEquals(0, response.getTotalVirtualCores());
+    Assert.assertEquals(0, response.getTotalNodes());
+    Assert.assertEquals(0, response.getLostNodes());
+    Assert.assertEquals(0, response.getUnhealthyNodes());
+    Assert.assertEquals(0, response.getDecommissioningNodes());
+    Assert.assertEquals(0, response.getDecommissionedNodes());
+    Assert.assertEquals(0, response.getRebootedNodes());
+    Assert.assertEquals(0, response.getActiveNodes());
+    Assert.assertEquals(0, response.getShutdownNodes());
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bac4e8cc/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
index 810432a..7073b3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
@@ -20,11 +20,15 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Random;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
 import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.junit.Assert;
@@ -40,6 +44,11 @@ public class TestRouterWebServiceUtil {
   private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
   private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
 
+  private static final String NODE1 = "Node1";
+  private static final String NODE2 = "Node2";
+  private static final String NODE3 = "Node3";
+  private static final String NODE4 = "Node4";
+
   /**
    * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
    * in case we want to merge 4 AMs. The expected result would be the same 4
@@ -308,4 +317,257 @@ public class TestRouterWebServiceUtil {
     Assert.assertNotNull(result);
     Assert.assertEquals(1, result.getApps().size());
   }
+
+  /**
+   * This test validates the correctness of
+   * RouterWebServiceUtil#deleteDuplicateNodesInfo in case we want to merge 4
+   * Nodes. The expected result would be the same 4 Nodes.
+   */
+  @Test
+  public void testDeleteDuplicate4DifferentNodes() {
+
+    NodesInfo nodes = new NodesInfo();
+
+    NodeInfo nodeInfo1 = new NodeInfo();
+    nodeInfo1.setId(NODE1);
+    nodes.add(nodeInfo1);
+
+    NodeInfo nodeInfo2 = new NodeInfo();
+    nodeInfo2.setId(NODE2);
+    nodes.add(nodeInfo2);
+
+    NodeInfo nodeInfo3 = new NodeInfo();
+    nodeInfo3.setId(NODE3);
+    nodes.add(nodeInfo3);
+
+    NodeInfo nodeInfo4 = new NodeInfo();
+    nodeInfo4.setId(NODE4);
+    nodes.add(nodeInfo4);
+
+    NodesInfo result =
+        RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
+    Assert.assertNotNull(result);
+    Assert.assertEquals(4, result.getNodes().size());
+
+    List<String> nodesIds = new ArrayList<String>();
+
+    for (NodeInfo node : result.getNodes()) {
+      nodesIds.add(node.getNodeId());
+    }
+
+    Assert.assertTrue(nodesIds.contains(NODE1));
+    Assert.assertTrue(nodesIds.contains(NODE2));
+    Assert.assertTrue(nodesIds.contains(NODE3));
+    Assert.assertTrue(nodesIds.contains(NODE4));
+  }
+
+  /**
+   * This test validates the correctness of
+   * {@link RouterWebServiceUtil#deleteDuplicateNodesInfo(ArrayList)} in case we
+   * want to merge 3 nodes with the same id. The expected result would be 1 node
+   * report with the newest healthy report.
+   */
+  @Test
+  public void testDeleteDuplicateNodes() {
+
+    NodesInfo nodes = new NodesInfo();
+
+    NodeInfo node1 = new NodeInfo();
+    node1.setId(NODE1);
+    node1.setLastHealthUpdate(0);
+    nodes.add(node1);
+
+    NodeInfo node2 = new NodeInfo();
+    node2.setId(NODE1);
+    node2.setLastHealthUpdate(1);
+    nodes.add(node2);
+
+    NodeInfo node3 = new NodeInfo();
+    node3.setId(NODE1);
+    node3.setLastHealthUpdate(2);
+    nodes.add(node3);
+
+    NodesInfo result =
+        RouterWebServiceUtil.deleteDuplicateNodesInfo(nodes.getNodes());
+    Assert.assertNotNull(result);
+    Assert.assertEquals(1, result.getNodes().size());
+
+    NodeInfo node = result.getNodes().get(0);
+
+    Assert.assertEquals(NODE1, node.getNodeId());
+    Assert.assertEquals(2, node.getLastHealthUpdate());
+  }
+
+  /**
+   * This test validates the correctness of
+   * {@link RouterWebServiceUtil#mergeMetrics}.
+   */
+  @Test
+  public void testMergeMetrics() {
+    ClusterMetricsInfo metrics = new ClusterMetricsInfo();
+    ClusterMetricsInfo metricsResponse = new ClusterMetricsInfo();
+
+    setUpClusterMetrics(metrics);
+    setUpClusterMetrics(metricsResponse);
+    ClusterMetricsInfo metricsClone = createClusterMetricsClone(metrics);
+    RouterWebServiceUtil.mergeMetrics(metrics, metricsResponse);
+
+    Assert.assertEquals(
+        metricsResponse.getAppsSubmitted() + metricsClone.getAppsSubmitted(),
+        metrics.getAppsSubmitted());
+    Assert.assertEquals(
+        metricsResponse.getAppsCompleted() + metricsClone.getAppsCompleted(),
+        metrics.getAppsCompleted());
+    Assert.assertEquals(
+        metricsResponse.getAppsPending() + metricsClone.getAppsPending(),
+        metrics.getAppsPending());
+    Assert.assertEquals(
+        metricsResponse.getAppsRunning() + metricsClone.getAppsRunning(),
+        metrics.getAppsRunning());
+    Assert.assertEquals(
+        metricsResponse.getAppsFailed() + metricsClone.getAppsFailed(),
+        metrics.getAppsFailed());
+    Assert.assertEquals(
+        metricsResponse.getAppsKilled() + metricsClone.getAppsKilled(),
+        metrics.getAppsKilled());
+
+    Assert.assertEquals(
+        metricsResponse.getReservedMB() + metricsClone.getReservedMB(),
+        metrics.getReservedMB());
+    Assert.assertEquals(
+        metricsResponse.getAvailableMB() + metricsClone.getAvailableMB(),
+        metrics.getAvailableMB());
+    Assert.assertEquals(
+        metricsResponse.getAllocatedMB() + metricsClone.getAllocatedMB(),
+        metrics.getAllocatedMB());
+
+    Assert.assertEquals(
+        metricsResponse.getReservedVirtualCores()
+            + metricsClone.getReservedVirtualCores(),
+        metrics.getReservedVirtualCores());
+    Assert.assertEquals(
+        metricsResponse.getAvailableVirtualCores()
+            + metricsClone.getAvailableVirtualCores(),
+        metrics.getAvailableVirtualCores());
+    Assert.assertEquals(
+        metricsResponse.getAllocatedVirtualCores()
+            + metricsClone.getAllocatedVirtualCores(),
+        metrics.getAllocatedVirtualCores());
+
+    Assert.assertEquals(
+        metricsResponse.getContainersAllocated()
+            + metricsClone.getContainersAllocated(),
+        metrics.getContainersAllocated());
+    Assert.assertEquals(
+        metricsResponse.getReservedContainers()
+            + metricsClone.getReservedContainers(),
+        metrics.getReservedContainers());
+    Assert.assertEquals(
+        metricsResponse.getPendingContainers()
+            + metricsClone.getPendingContainers(),
+        metrics.getPendingContainers());
+
+    Assert.assertEquals(
+        metricsResponse.getTotalMB() + metricsClone.getTotalMB(),
+        metrics.getTotalMB());
+    Assert.assertEquals(
+        metricsResponse.getTotalVirtualCores()
+            + metricsClone.getTotalVirtualCores(),
+        metrics.getTotalVirtualCores());
+    Assert.assertEquals(
+        metricsResponse.getTotalNodes() + metricsClone.getTotalNodes(),
+        metrics.getTotalNodes());
+    Assert.assertEquals(
+        metricsResponse.getLostNodes() + metricsClone.getLostNodes(),
+        metrics.getLostNodes());
+    Assert.assertEquals(
+        metricsResponse.getUnhealthyNodes() + metricsClone.getUnhealthyNodes(),
+        metrics.getUnhealthyNodes());
+    Assert.assertEquals(
+        metricsResponse.getDecommissioningNodes()
+            + metricsClone.getDecommissioningNodes(),
+        metrics.getDecommissioningNodes());
+    Assert.assertEquals(
+        metricsResponse.getDecommissionedNodes()
+            + metricsClone.getDecommissionedNodes(),
+        metrics.getDecommissionedNodes());
+    Assert.assertEquals(
+        metricsResponse.getRebootedNodes() + metricsClone.getRebootedNodes(),
+        metrics.getRebootedNodes());
+    Assert.assertEquals(
+        metricsResponse.getActiveNodes() + metricsClone.getActiveNodes(),
+        metrics.getActiveNodes());
+    Assert.assertEquals(
+        metricsResponse.getShutdownNodes() + metricsClone.getShutdownNodes(),
+        metrics.getShutdownNodes());
+  }
+
+  private ClusterMetricsInfo createClusterMetricsClone(
+      ClusterMetricsInfo metrics) {
+    ClusterMetricsInfo metricsClone = new ClusterMetricsInfo();
+    metricsClone.setAppsSubmitted(metrics.getAppsSubmitted());
+    metricsClone.setAppsCompleted(metrics.getAppsCompleted());
+    metricsClone.setAppsPending(metrics.getAppsPending());
+    metricsClone.setAppsRunning(metrics.getAppsRunning());
+    metricsClone.setAppsFailed(metrics.getAppsFailed());
+    metricsClone.setAppsKilled(metrics.getAppsKilled());
+
+    metricsClone.setReservedMB(metrics.getReservedMB());
+    metricsClone.setAvailableMB(metrics.getAvailableMB());
+    metricsClone.setAllocatedMB(metrics.getAllocatedMB());
+
+    metricsClone.setReservedVirtualCores(metrics.getReservedVirtualCores());
+    metricsClone.setAvailableVirtualCores(metrics.getAvailableVirtualCores());
+    metricsClone.setAllocatedVirtualCores(metrics.getAllocatedVirtualCores());
+
+    metricsClone.setContainersAllocated(metrics.getContainersAllocated());
+    metricsClone.setContainersReserved(metrics.getReservedContainers());
+    metricsClone.setContainersPending(metrics.getPendingContainers());
+
+    metricsClone.setTotalMB(metrics.getTotalMB());
+    metricsClone.setTotalVirtualCores(metrics.getTotalVirtualCores());
+    metricsClone.setTotalNodes(metrics.getTotalNodes());
+    metricsClone.setLostNodes(metrics.getLostNodes());
+    metricsClone.setUnhealthyNodes(metrics.getUnhealthyNodes());
+    metricsClone.setDecommissioningNodes(metrics.getDecommissioningNodes());
+    metricsClone.setDecommissionedNodes(metrics.getDecommissionedNodes());
+    metricsClone.setRebootedNodes(metrics.getRebootedNodes());
+    metricsClone.setActiveNodes(metrics.getActiveNodes());
+    metricsClone.setShutdownNodes(metrics.getShutdownNodes());
+    return metricsClone;
+
+  }
+
+  private void setUpClusterMetrics(ClusterMetricsInfo metrics) {
+    Random rand = new Random(System.currentTimeMillis());
+    metrics.setAppsSubmitted(rand.nextInt(1000));
+    metrics.setAppsCompleted(rand.nextInt(1000));
+    metrics.setAppsPending(rand.nextInt(1000));
+    metrics.setAppsRunning(rand.nextInt(1000));
+    metrics.setAppsFailed(rand.nextInt(1000));
+    metrics.setAppsKilled(rand.nextInt(1000));
+
+    metrics.setReservedMB(rand.nextInt(1000));
+    metrics.setAvailableMB(rand.nextInt(1000));
+    metrics.setAllocatedMB(rand.nextInt(1000));
+
+    metrics.setReservedVirtualCores(rand.nextInt(1000));
+    metrics.setAvailableVirtualCores(rand.nextInt(1000));
+    metrics.setAllocatedVirtualCores(rand.nextInt(1000));
+
+    metrics.setContainersAllocated(rand.nextInt(1000));
+    metrics.setContainersReserved(rand.nextInt(1000));
+    metrics.setContainersPending(rand.nextInt(1000));
+
+    metrics.setTotalMB(rand.nextInt(1000));
+    metrics.setTotalVirtualCores(rand.nextInt(1000));
+    metrics.setTotalNodes(rand.nextInt(1000));
+    metrics.setLostNodes(rand.nextInt(1000));
+    metrics.setUnhealthyNodes(rand.nextInt(1000));
+    metrics.setDecommissioningNodes(rand.nextInt(1000));
+    metrics.setDecommissionedNodes(rand.nextInt(1000));
+    metrics.setRebootedNodes(rand.nextInt(1000));
+    metrics.setActiveNodes(rand.nextInt(1000));
+    metrics.setShutdownNodes(rand.nextInt(1000));
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org