You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/09/22 01:20:06 UTC

[1/8] hadoop git commit: YARN-6970. Add PoolInitializationException as retriable exception in FederationFacade. (Giovanni Matteo Fumarola via Subru).

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d11be2dca -> 7cd9018b1


YARN-6970. Add PoolInitializationException as retriable exception in FederationFacade. (Giovanni Matteo Fumarola via Subru).

(cherry picked from commit ad2a3506626728a6be47af0db3ca60610a568734)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a1ee4ad7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a1ee4ad7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a1ee4ad7

Branch: refs/heads/branch-2
Commit: a1ee4ad77f964e43ff8005729327d2a0fed6fa04
Parents: d11be2d
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Aug 8 16:48:29 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:08:30 2017 -0700

----------------------------------------------------------------------
 .../utils/FederationStateStoreFacade.java       |  2 ++
 .../TestFederationStateStoreFacadeRetry.java    | 24 ++++++++++++++++++++
 2 files changed, 26 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1ee4ad7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 389c769..682eb14 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -70,6 +70,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
 
 /**
  *
@@ -162,6 +163,7 @@ public final class FederationStateStoreFacade {
     exceptionToPolicyMap.put(FederationStateStoreRetriableException.class,
         basePolicy);
     exceptionToPolicyMap.put(CacheLoaderException.class, basePolicy);
+    exceptionToPolicyMap.put(PoolInitializationException.class, basePolicy);
 
     RetryPolicy retryPolicy = RetryPolicies.retryByException(
         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a1ee4ad7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
index 304910e..ea43268 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS
 import org.junit.Assert;
 import org.junit.Test;
 
+import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
+
 /**
  * Test class to validate FederationStateStoreFacade retry policy.
  */
@@ -119,4 +121,26 @@ public class TestFederationStateStoreFacadeRetry {
         policy.shouldRetry(new CacheLoaderException(""), maxRetries, 0, false);
     Assert.assertEquals(RetryAction.FAIL.action, action.action);
   }
+
+  /*
+   * Test to validate that PoolInitializationException is a retriable exception.
+   */
+  @Test
+  public void testFacadePoolInitRetriableException() throws Exception {
+    // PoolInitializationException is a retriable exception
+    conf = new Configuration();
+    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+    RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
+    RetryAction action = policy.shouldRetry(
+        new PoolInitializationException(new YarnException()), 0, 0, false);
+    // We compare only the action, delay and the reason are random value
+    // during this test
+    Assert.assertEquals(RetryAction.RETRY.action, action.action);
+
+    // After maxRetries we stop to retry
+    action =
+        policy.shouldRetry(new PoolInitializationException(new YarnException()),
+            maxRetries, 0, false);
+    Assert.assertEquals(RetryAction.FAIL.action, action.action);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/8] hadoop git commit: YARN-6996. Change javax.cache library implementation from JSR107 to Apache Geronimo. (Ray Chiang via Subru).

Posted by cu...@apache.org.
YARN-6996. Change javax.cache library implementation from JSR107 to Apache Geronimo. (Ray Chiang via Subru).

(cherry picked from commit 18f3603bce37e0e07c9075811b1179afc2c227eb)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9ad067ef
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9ad067ef
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9ad067ef

Branch: refs/heads/branch-2
Commit: 9ad067efe95b738f7d7ed886d94121d5b806be96
Parents: 8220b19
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Aug 14 11:10:00 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:16 2017 -0700

----------------------------------------------------------------------
 hadoop-project/pom.xml                                         | 6 +++---
 .../hadoop-yarn-server/hadoop-yarn-server-common/pom.xml       | 4 ++--
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ad067ef/hadoop-project/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 6b24a75..9918af5 100755
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -88,7 +88,7 @@
     <!-- Required for testing LDAP integration -->
     <apacheds.version>2.0.0-M15</apacheds.version>
 
-    <jcache.version>1.0.0</jcache.version>
+    <jcache.version>1.0-alpha-1</jcache.version>
     <ehcache.version>3.3.1</ehcache.version>
     <hikari.version>2.4.12</hikari.version>
     <mssql.version>6.2.1.jre7</mssql.version>
@@ -1086,8 +1086,8 @@
             <version>1.3.0</version>
         </dependency>
         <dependency>
-          <groupId>javax.cache</groupId>
-          <artifactId>cache-api</artifactId>
+          <groupId>org.apache.geronimo.specs</groupId>
+          <artifactId>geronimo-jcache_1.0_spec</artifactId>
           <version>${jcache.version}</version>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9ad067ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 7722f4f..8c754fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -110,8 +110,8 @@
       <artifactId>leveldbjni-all</artifactId>
     </dependency>
     <dependency>
-      <groupId>javax.cache</groupId>
-      <artifactId>cache-api</artifactId>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jcache_1.0_spec</artifactId>
     </dependency>
     <dependency>
       <groupId>org.ehcache</groupId>


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[7/8] hadoop git commit: YARN-7010. Federation: routing REST invocations transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni Matteo Fumarola via curino)

Posted by cu...@apache.org.
YARN-7010. Federation: routing REST invocations transparently to multiple RMs (part 2 - getApps). (Contributed by Giovanni Matteo Fumarola via curino)

(cherry picked from commit cc8893edc0b7960e958723c81062986c12f06ade)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/88b32edb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/88b32edb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/88b32edb

Branch: refs/heads/branch-2
Commit: 88b32edb8fb49bc87e5e56f4cce28c8358eae398
Parents: 2aacb9d
Author: Carlo Curino <cu...@apache.org>
Authored: Tue Aug 29 14:53:09 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:44 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   9 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../server/uam/UnmanagedApplicationManager.java |   2 +-
 .../resourcemanager/webapp/dao/AppInfo.java     | 184 +++++++----
 .../resourcemanager/webapp/dao/AppsInfo.java    |   4 +
 .../yarn/server/router/RouterMetrics.java       |  33 ++
 .../webapp/FederationInterceptorREST.java       | 118 ++++++-
 .../router/webapp/RouterWebServiceUtil.java     | 109 ++++++-
 .../yarn/server/router/TestRouterMetrics.java   |  50 +++
 .../MockDefaultRequestInterceptorREST.java      |  49 ++-
 .../webapp/TestFederationInterceptorREST.java   |  17 +
 .../TestFederationInterceptorRESTRetry.java     |  45 +++
 .../router/webapp/TestRouterWebServiceUtil.java | 311 +++++++++++++++++++
 13 files changed, 855 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index bf18ade..0f05f1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2671,6 +2671,15 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.router.webapp."
           + "DefaultRequestInterceptorREST";
 
+  /**
+   * The interceptor class used in FederationInterceptorREST should return
+   * partial AppReports.
+   */
+  public static final String ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
+      ROUTER_WEBAPP_PREFIX + "partial-result.enabled";
+  public static final boolean DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED =
+      false;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index ad38051..6cb92f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -158,6 +158,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
 
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
 
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
index 60a9a27..6531a75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java
@@ -83,7 +83,7 @@ public class UnmanagedApplicationManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(UnmanagedApplicationManager.class);
   private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000;
-  private static final String APP_NAME = "UnmanagedAM";
+  public static final String APP_NAME = "UnmanagedAM";
   private static final String DEFAULT_QUEUE_CONFIG = "uam.default.queue.name";
 
   private BlockingQueue<AsyncAllocateRequestInfo> requestQueue;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
index 6f7763e..af1b2fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppInfo.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.DeSelectFields.DeSel
 import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Joiner;
 
 @XmlRootElement(name = "app")
@@ -71,9 +72,9 @@ public class AppInfo {
   // these are ok for any user to see
   protected String id;
   protected String user;
-  protected String name;
+  private String name;
   protected String queue;
-  protected YarnApplicationState state;
+  private YarnApplicationState state;
   protected FinalApplicationStatus finalStatus;
   protected float progress;
   protected String trackingUI;
@@ -91,21 +92,21 @@ public class AppInfo {
   protected String amContainerLogs;
   protected String amHostHttpAddress;
   private String amRPCAddress;
-  protected long allocatedMB;
-  protected long allocatedVCores;
-  protected long reservedMB;
-  protected long reservedVCores;
-  protected int runningContainers;
-  protected long memorySeconds;
-  protected long vcoreSeconds;
+  private long allocatedMB;
+  private long allocatedVCores;
+  private long reservedMB;
+  private long reservedVCores;
+  private int runningContainers;
+  private long memorySeconds;
+  private long vcoreSeconds;
   protected float queueUsagePercentage;
   protected float clusterUsagePercentage;
 
   // preemption info fields
-  protected long preemptedResourceMB;
-  protected long preemptedResourceVCores;
-  protected int numNonAMContainerPreempted;
-  protected int numAMContainerPreempted;
+  private long preemptedResourceMB;
+  private long preemptedResourceVCores;
+  private int numNonAMContainerPreempted;
+  private int numAMContainerPreempted;
   private long preemptedMemorySeconds;
   private long preemptedVcoreSeconds;
 
@@ -141,12 +142,11 @@ public class AppInfo {
           || YarnApplicationState.NEW_SAVING == this.state
           || YarnApplicationState.SUBMITTED == this.state
           || YarnApplicationState.ACCEPTED == this.state;
-      this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED" : (app
-          .getFinishTime() == 0 ? "ApplicationMaster" : "History");
+      this.trackingUI = this.trackingUrlIsNotReady ? "UNASSIGNED"
+          : (app.getFinishTime() == 0 ? "ApplicationMaster" : "History");
       if (!trackingUrlIsNotReady) {
         this.trackingUrl =
-            WebAppUtils.getURLWithScheme(schemePrefix,
-                trackingUrl);
+            WebAppUtils.getURLWithScheme(schemePrefix, trackingUrl);
         this.trackingUrlPretty = this.trackingUrl;
       } else {
         this.trackingUrlPretty = "UNASSIGNED";
@@ -161,15 +161,15 @@ public class AppInfo {
       this.priority = 0;
 
       if (app.getApplicationPriority() != null) {
-        this.priority = app.getApplicationPriority()
-            .getPriority();
+        this.priority = app.getApplicationPriority().getPriority();
       }
       this.progress = app.getProgress() * 100;
       this.diagnostics = app.getDiagnostics().toString();
       if (diagnostics == null || diagnostics.isEmpty()) {
         this.diagnostics = "";
       }
-      if (app.getApplicationTags() != null && !app.getApplicationTags().isEmpty()) {
+      if (app.getApplicationTags() != null
+          && !app.getApplicationTags().isEmpty()) {
         this.applicationTags = Joiner.on(',').join(app.getApplicationTags());
       }
       this.finalStatus = app.getFinalApplicationStatus();
@@ -177,8 +177,8 @@ public class AppInfo {
       if (hasAccess) {
         this.startedTime = app.getStartTime();
         this.finishedTime = app.getFinishTime();
-        this.elapsedTime = Times.elapsed(app.getStartTime(),
-            app.getFinishTime());
+        this.elapsedTime =
+            Times.elapsed(app.getStartTime(), app.getFinishTime());
         this.logAggregationStatus = app.getLogAggregationStatusForAppReport();
         RMAppAttempt attempt = app.getCurrentAppAttempt();
         if (attempt != null) {
@@ -193,8 +193,8 @@ public class AppInfo {
 
           this.amRPCAddress = getAmRPCAddressFromRMAppAttempt(attempt);
 
-          ApplicationResourceUsageReport resourceReport = attempt
-              .getApplicationResourceUsageReport();
+          ApplicationResourceUsageReport resourceReport =
+              attempt.getApplicationResourceUsageReport();
           if (resourceReport != null) {
             Resource usedResources = resourceReport.getUsedResources();
             Resource reservedResources = resourceReport.getReservedResources();
@@ -207,10 +207,11 @@ public class AppInfo {
             clusterUsagePercentage = resourceReport.getClusterUsagePercentage();
           }
 
-          /* When the deSelects parameter contains "resourceRequests",
-             it skips returning massive ResourceRequest objects and vice versa.
-             Default behavior is no skipping. (YARN-6280)
-          */
+          /*
+           * When the deSelects parameter contains "resourceRequests", it skips
+           * returning massive ResourceRequest objects and vice versa. Default
+           * behavior is no skipping. (YARN-6280)
+           */
           if (!deSelects.contains(DeSelectType.RESOURCE_REQUESTS)) {
             List<ResourceRequest> resourceRequestsRaw = rm.getRMContext()
               .getScheduler()
@@ -226,12 +227,9 @@ public class AppInfo {
 
       // copy preemption info fields
       RMAppMetrics appMetrics = app.getRMAppMetrics();
-      numAMContainerPreempted =
-          appMetrics.getNumAMContainersPreempted();
-      preemptedResourceMB =
-          appMetrics.getResourcePreempted().getMemorySize();
-      numNonAMContainerPreempted =
-          appMetrics.getNumNonAMContainersPreempted();
+      numAMContainerPreempted = appMetrics.getNumAMContainersPreempted();
+      preemptedResourceMB = appMetrics.getResourcePreempted().getMemorySize();
+      numNonAMContainerPreempted = appMetrics.getNumNonAMContainersPreempted();
       preemptedResourceVCores =
           appMetrics.getResourcePreempted().getVirtualCores();
       memorySeconds = appMetrics.getMemorySeconds();
@@ -240,8 +238,7 @@ public class AppInfo {
       preemptedVcoreSeconds = appMetrics.getPreemptedVcoreSeconds();
       ApplicationSubmissionContext appSubmissionContext =
           app.getApplicationSubmissionContext();
-      unmanagedApplication =
-          appSubmissionContext.getUnmanagedAM();
+      unmanagedApplication = appSubmissionContext.getUnmanagedAM();
       appNodeLabelExpression =
           app.getApplicationSubmissionContext().getNodeLabelExpression();
       amNodeLabelExpression = (unmanagedApplication) ? null
@@ -284,6 +281,7 @@ public class AppInfo {
           timeouts.add(timeout);
         }
       }
+
     }
   }
 
@@ -394,19 +392,19 @@ public class AppInfo {
   public String getApplicationTags() {
     return this.applicationTags;
   }
-  
+
   public int getRunningContainers() {
     return this.runningContainers;
   }
-  
+
   public long getAllocatedMB() {
     return this.allocatedMB;
   }
-  
+
   public long getAllocatedVCores() {
     return this.allocatedVCores;
   }
-  
+
   public long getReservedMB() {
     return this.reservedMB;
   }
@@ -415,22 +413,6 @@ public class AppInfo {
     return this.reservedVCores;
   }
 
-  public long getPreemptedMB() {
-    return preemptedResourceMB;
-  }
-
-  public long getPreemptedVCores() {
-    return preemptedResourceVCores;
-  }
-
-  public int getNumNonAMContainersPreempted() {
-    return numNonAMContainerPreempted;
-  }
-  
-  public int getNumAMContainersPreempted() {
-    return numAMContainerPreempted;
-  }
- 
   public long getMemorySeconds() {
     return memorySeconds;
   }
@@ -446,10 +428,15 @@ public class AppInfo {
   public long getPreemptedVcoreSeconds() {
     return preemptedVcoreSeconds;
   }
+
   public List<ResourceRequestInfo> getResourceRequests() {
     return this.resourceRequests;
   }
 
+  public void setResourceRequests(List<ResourceRequestInfo> resourceRequests) {
+    this.resourceRequests = resourceRequests;
+  }
+
   public LogAggregationStatus getLogAggregationStatus() {
     return this.logAggregationStatus;
   }
@@ -473,4 +460,89 @@ public class AppInfo {
   public ResourcesInfo getResourceInfo() {
     return resourceInfo;
   }
+
+  public long getPreemptedResourceMB() {
+    return preemptedResourceMB;
+  }
+
+  public void setPreemptedResourceMB(long preemptedResourceMB) {
+    this.preemptedResourceMB = preemptedResourceMB;
+  }
+
+  public long getPreemptedResourceVCores() {
+    return preemptedResourceVCores;
+  }
+
+  public void setPreemptedResourceVCores(long preemptedResourceVCores) {
+    this.preemptedResourceVCores = preemptedResourceVCores;
+  }
+
+  public int getNumNonAMContainerPreempted() {
+    return numNonAMContainerPreempted;
+  }
+
+  public void setNumNonAMContainerPreempted(int numNonAMContainerPreempted) {
+    this.numNonAMContainerPreempted = numNonAMContainerPreempted;
+  }
+
+  public int getNumAMContainerPreempted() {
+    return numAMContainerPreempted;
+  }
+
+  public void setNumAMContainerPreempted(int numAMContainerPreempted) {
+    this.numAMContainerPreempted = numAMContainerPreempted;
+  }
+
+  public void setPreemptedMemorySeconds(long preemptedMemorySeconds) {
+    this.preemptedMemorySeconds = preemptedMemorySeconds;
+  }
+
+  public void setPreemptedVcoreSeconds(long preemptedVcoreSeconds) {
+    this.preemptedVcoreSeconds = preemptedVcoreSeconds;
+  }
+
+  public void setAllocatedMB(long allocatedMB) {
+    this.allocatedMB = allocatedMB;
+  }
+
+  public void setAllocatedVCores(long allocatedVCores) {
+    this.allocatedVCores = allocatedVCores;
+  }
+
+  public void setReservedMB(long reservedMB) {
+    this.reservedMB = reservedMB;
+  }
+
+  public void setReservedVCores(long reservedVCores) {
+    this.reservedVCores = reservedVCores;
+  }
+
+  public void setRunningContainers(int runningContainers) {
+    this.runningContainers = runningContainers;
+  }
+
+  public void setMemorySeconds(long memorySeconds) {
+    this.memorySeconds = memorySeconds;
+  }
+
+  public void setVcoreSeconds(long vcoreSeconds) {
+    this.vcoreSeconds = vcoreSeconds;
+  }
+
+  public void setAppId(String appId) {
+    this.id = appId;
+  }
+
+  @VisibleForTesting
+  public void setAMHostHttpAddress(String amHost) {
+    this.amHostHttpAddress = amHost;
+  }
+
+  public void setState(YarnApplicationState state) {
+    this.state = state;
+  }
+
+  public void setName(String name) {
+    this.name = name;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
index 84f68f1..39837b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/AppsInfo.java
@@ -40,4 +40,8 @@ public class AppsInfo {
     return app;
   }
 
+  public void addAll(ArrayList<AppInfo> appsInfo) {
+    app.addAll(appsInfo);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
index 42361a3..6d75471 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -49,6 +49,8 @@ public final class RouterMetrics {
   private MutableGaugeInt numAppsFailedKilled;
   @Metric("# of application reports failed to be retrieved")
   private MutableGaugeInt numAppsFailedRetrieved;
+  @Metric("# of multiple applications reports failed to be retrieved")
+  private MutableGaugeInt numMultipleAppsFailedRetrieved;
 
   // Aggregate metrics are shared, and don't have to be looked up per call
   @Metric("Total number of successful Submitted apps and latency(ms)")
@@ -59,6 +61,9 @@ public final class RouterMetrics {
   private MutableRate totalSucceededAppsCreated;
   @Metric("Total number of successful Retrieved app reports and latency(ms)")
   private MutableRate totalSucceededAppsRetrieved;
+  @Metric("Total number of successful Retrieved multiple apps reports and "
+      + "latency(ms)")
+  private MutableRate totalSucceededMultipleAppsRetrieved;
 
   /**
    * Provide quantile counters for all latencies.
@@ -67,6 +72,7 @@ public final class RouterMetrics {
   private MutableQuantiles getNewApplicationLatency;
   private MutableQuantiles killApplicationLatency;
   private MutableQuantiles getApplicationReportLatency;
+  private MutableQuantiles getApplicationsReportLatency;
 
   private static volatile RouterMetrics INSTANCE = null;
   private static MetricsRegistry registry;
@@ -83,6 +89,9 @@ public final class RouterMetrics {
     getApplicationReportLatency =
         registry.newQuantiles("getApplicationReportLatency",
             "latency of get application report", "ops", "latency", 10);
+    getApplicationsReportLatency =
+        registry.newQuantiles("getApplicationsReportLatency",
+            "latency of get applications report", "ops", "latency", 10);
   }
 
   public static RouterMetrics getMetrics() {
@@ -125,6 +134,11 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
+  public long getNumSucceededMultipleAppsRetrieved() {
+    return totalSucceededMultipleAppsRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
   public double getLatencySucceededAppsCreated() {
     return totalSucceededAppsCreated.lastStat().mean();
   }
@@ -145,6 +159,11 @@ public final class RouterMetrics {
   }
 
   @VisibleForTesting
+  public double getLatencySucceededMultipleGetAppReport() {
+    return totalSucceededMultipleAppsRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
   public int getAppsFailedCreated() {
     return numAppsFailedCreated.value();
   }
@@ -164,6 +183,11 @@ public final class RouterMetrics {
     return numAppsFailedRetrieved.value();
   }
 
+  @VisibleForTesting
+  public int getMultipleAppsFailedRetrieved() {
+    return numMultipleAppsFailedRetrieved.value();
+  }
+
   public void succeededAppsCreated(long duration) {
     totalSucceededAppsCreated.add(duration);
     getNewApplicationLatency.add(duration);
@@ -184,6 +208,11 @@ public final class RouterMetrics {
     getApplicationReportLatency.add(duration);
   }
 
+  public void succeededMultipleAppsRetrieved(long duration) {
+    totalSucceededMultipleAppsRetrieved.add(duration);
+    getApplicationsReportLatency.add(duration);
+  }
+
   public void incrAppsFailedCreated() {
     numAppsFailedCreated.incr();
   }
@@ -200,4 +229,8 @@ public final class RouterMetrics {
     numAppsFailedRetrieved.incr();
   }
 
+  public void incrMultipleAppsFailedRetrieved() {
+    numMultipleAppsFailedRetrieved.incr();
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 4c7d4b1..3a91e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -25,6 +25,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -102,9 +107,15 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   private RouterPolicyFacade policyFacade;
   private RouterMetrics routerMetrics;
   private final Clock clock = new MonotonicClock();
+  private boolean returnPartialReport;
 
   private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
 
+  /**
+   * Thread pool used for asynchronous operations.
+   */
+  private ExecutorService threadpool;
+
   @Override
   public void init(String user) {
     federationFacade = FederationStateStoreFacade.getInstance();
@@ -125,6 +136,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
     routerMetrics = RouterMetrics.getMetrics();
+    threadpool = Executors.newCachedThreadPool();
+
+    returnPartialReport =
+        conf.getBoolean(YarnConfiguration.ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED,
+            YarnConfiguration.DEFAULT_ROUTER_WEBAPP_PARTIAL_RESULTS_ENABLED);
   }
 
   private SubClusterId getRandomActiveSubCluster(
@@ -586,6 +602,99 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     return response;
   }
 
+  /**
+   * The Yarn Router will forward the request to all the Yarn RMs in parallel,
+   * after that it will group all the ApplicationReports by the ApplicationId.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router calls each Yarn RM in parallel by using one
+   * thread for each Yarn RM. In case a Yarn RM fails, a single call will
+   * timeout. However the Router will merge the ApplicationReports it got, and
+   * provides a partial list to the client.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+      Set<String> statesQuery, String finalStatusQuery, String userQuery,
+      String queueQuery, String count, String startedBegin, String startedEnd,
+      String finishBegin, String finishEnd, Set<String> applicationTypes,
+      Set<String> applicationTags, Set<String> unselectedFields) {
+    AppsInfo apps = new AppsInfo();
+    long startTime = clock.getTime();
+
+    Map<SubClusterId, SubClusterInfo> subClustersActive = null;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      routerMetrics.incrMultipleAppsFailedRetrieved();
+      return null;
+    }
+
+    // Send the requests in parallel
+
+    ExecutorCompletionService<AppsInfo> compSvc =
+        new ExecutorCompletionService<AppsInfo>(this.threadpool);
+
+    for (final SubClusterInfo info : subClustersActive.values()) {
+      compSvc.submit(new Callable<AppsInfo>() {
+        @Override
+        public AppsInfo call() {
+          DefaultRequestInterceptorREST interceptor =
+              getOrCreateInterceptorForSubCluster(info.getSubClusterId(),
+                  info.getClientRMServiceAddress());
+          AppsInfo rmApps = interceptor.getApps(hsr, stateQuery, statesQuery,
+              finalStatusQuery, userQuery, queueQuery, count, startedBegin,
+              startedEnd, finishBegin, finishEnd, applicationTypes,
+              applicationTags, unselectedFields);
+
+          if (rmApps == null) {
+            routerMetrics.incrMultipleAppsFailedRetrieved();
+            LOG.error("Subcluster " + info.getSubClusterId()
+                + " failed to return appReport.");
+            return null;
+          }
+          return rmApps;
+        }
+      });
+    }
+
+    // Collect all the responses in parallel
+
+    for (int i = 0; i < subClustersActive.values().size(); i++) {
+      try {
+        Future<AppsInfo> future = compSvc.take();
+        AppsInfo appsResponse = future.get();
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededMultipleAppsRetrieved(stopTime - startTime);
+
+        if (appsResponse != null) {
+          apps.addAll(appsResponse.getApps());
+        }
+      } catch (Throwable e) {
+        routerMetrics.incrMultipleAppsFailedRetrieved();
+        LOG.warn("Failed to get application report ", e);
+      }
+    }
+
+    if (apps.getApps().isEmpty()) {
+      return null;
+    }
+
+    // Merge all the application reports got from all the available Yarn RMs
+
+    return RouterWebServiceUtil.mergeAppsInfo(apps.getApps(),
+        returnPartialReport);
+  }
+
   @Override
   public ClusterInfo get() {
     return getClusterInfo();
@@ -640,15 +749,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   }
 
   @Override
-  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
-      Set<String> statesQuery, String finalStatusQuery, String userQuery,
-      String queueQuery, String count, String startedBegin, String startedEnd,
-      String finishBegin, String finishEnd, Set<String> applicationTypes,
-      Set<String> applicationTags, Set<String> unselectedFields) {
-    throw new NotImplementedException();
-  }
-
-  @Override
   public AppState getAppState(HttpServletRequest hsr, String appId)
       throws AuthorizationException {
     throw new NotImplementedException();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
index 1c4332e..cf08c33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/RouterWebServiceUtil.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.router.webapp;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -33,7 +35,11 @@ import javax.ws.rs.core.Response.ResponseBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
@@ -55,6 +61,8 @@ public final class RouterWebServiceUtil {
   private static final Log LOG =
       LogFactory.getLog(RouterWebServiceUtil.class.getName());
 
+  private final static String PARTIAL_REPORT = "Partial Report ";
+
   /** Disable constructor. */
   private RouterWebServiceUtil() {
   }
@@ -225,4 +233,103 @@ public final class RouterWebServiceUtil {
 
   }
 
-}
\ No newline at end of file
+  /**
+   * Merges a list of AppInfo grouping by ApplicationId. Our current policy
+   * is to merge the application reports from the reacheable SubClusters.
+   * Via configuration parameter, we decide whether to return applications
+   * for which the primary AM is missing or to omit them.
+   *
+   * @param appsInfo a list of AppInfo to merge
+   * @param returnPartialResult if the merge AppsInfo should contain partial
+   *          result or not
+   * @return the merged AppsInfo
+   */
+  public static AppsInfo mergeAppsInfo(ArrayList<AppInfo> appsInfo,
+      boolean returnPartialResult) {
+    AppsInfo allApps = new AppsInfo();
+
+    Map<String, AppInfo> federationAM = new HashMap<String, AppInfo>();
+    Map<String, AppInfo> federationUAMSum = new HashMap<String, AppInfo>();
+    for (AppInfo a : appsInfo) {
+      // Check if this AppInfo is an AM
+      if (a.getAMHostHttpAddress() != null) {
+        // Insert in the list of AM
+        federationAM.put(a.getAppId(), a);
+        // Check if there are any UAM found before
+        if (federationUAMSum.containsKey(a.getAppId())) {
+          // Merge the current AM with the found UAM
+          mergeAMWithUAM(a, federationUAMSum.get(a.getAppId()));
+          // Remove the sum of the UAMs
+          federationUAMSum.remove(a.getAppId());
+        }
+        // This AppInfo is an UAM
+      } else {
+        if (federationAM.containsKey(a.getAppId())) {
+          // Merge the current UAM with its own AM
+          mergeAMWithUAM(federationAM.get(a.getAppId()), a);
+        } else if (federationUAMSum.containsKey(a.getAppId())) {
+          // Merge the current UAM with its own UAM and update the list of UAM
+          federationUAMSum.put(a.getAppId(),
+              mergeUAMWithUAM(federationUAMSum.get(a.getAppId()), a));
+        } else {
+          // Insert in the list of UAM
+          federationUAMSum.put(a.getAppId(), a);
+        }
+      }
+    }
+
+    // Check the remaining UAMs are depending or not from federation
+    for (AppInfo a : federationUAMSum.values()) {
+      if (returnPartialResult || (a.getName() != null
+          && !(a.getName().startsWith(UnmanagedApplicationManager.APP_NAME)
+              || a.getName().startsWith(PARTIAL_REPORT)))) {
+        federationAM.put(a.getAppId(), a);
+      }
+    }
+
+    allApps.addAll(new ArrayList<AppInfo>(federationAM.values()));
+    return allApps;
+  }
+
+  private static AppInfo mergeUAMWithUAM(AppInfo uam1, AppInfo uam2) {
+    AppInfo partialReport = new AppInfo();
+    partialReport.setAppId(uam1.getAppId());
+    partialReport.setName(PARTIAL_REPORT + uam1.getAppId());
+    // We pick the status of the first uam
+    partialReport.setState(uam1.getState());
+    // Merge the newly partial AM with UAM1 and then with UAM2
+    mergeAMWithUAM(partialReport, uam1);
+    mergeAMWithUAM(partialReport, uam2);
+    return partialReport;
+  }
+
+  private static void mergeAMWithUAM(AppInfo am, AppInfo uam) {
+    am.setPreemptedResourceMB(
+        am.getPreemptedResourceMB() + uam.getPreemptedResourceMB());
+    am.setPreemptedResourceVCores(
+        am.getPreemptedResourceVCores() + uam.getPreemptedResourceVCores());
+    am.setNumNonAMContainerPreempted(am.getNumNonAMContainerPreempted()
+        + uam.getNumNonAMContainerPreempted());
+    am.setNumAMContainerPreempted(
+        am.getNumAMContainerPreempted() + uam.getNumAMContainerPreempted());
+    am.setPreemptedMemorySeconds(
+        am.getPreemptedMemorySeconds() + uam.getPreemptedMemorySeconds());
+    am.setPreemptedVcoreSeconds(
+        am.getPreemptedVcoreSeconds() + uam.getPreemptedVcoreSeconds());
+
+    if (am.getState() == YarnApplicationState.RUNNING
+        && uam.getState() == am.getState()) {
+
+      am.getResourceRequests().addAll(uam.getResourceRequests());
+
+      am.setAllocatedMB(am.getAllocatedMB() + uam.getAllocatedMB());
+      am.setAllocatedVCores(am.getAllocatedVCores() + uam.getAllocatedVCores());
+      am.setReservedMB(am.getReservedMB() + uam.getReservedMB());
+      am.setReservedVCores(am.getReservedVCores() + uam.getReservedMB());
+      am.setRunningContainers(
+          am.getRunningContainers() + uam.getRunningContainers());
+      am.setMemorySeconds(am.getMemorySeconds() + uam.getMemorySeconds());
+      am.setVcoreSeconds(am.getVcoreSeconds() + uam.getVcoreSeconds());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
index 3cdafd8..4c18ace 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -196,6 +196,45 @@ public class TestRouterMetrics {
     Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
   }
 
+  /**
+   * This test validates the correctness of the metric: Retrieved Multiple Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededMultipleAppsReport() {
+
+    long totalGoodBefore = metrics.getNumSucceededMultipleAppsRetrieved();
+
+    goodSubCluster.getApplicationsReport(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededMultipleAppsRetrieved());
+    Assert.assertEquals(100, metrics.getLatencySucceededMultipleGetAppReport(),
+        0);
+
+    goodSubCluster.getApplicationsReport(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededMultipleAppsRetrieved());
+    Assert.assertEquals(150, metrics.getLatencySucceededMultipleGetAppReport(),
+        0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to retrieve
+   * Multiple Apps.
+   */
+  @Test
+  public void testMulipleAppsReportFailed() {
+
+    long totalBadbefore = metrics.getMultipleAppsFailedRetrieved();
+
+    badSubCluster.getApplicationsReport();
+
+    Assert.assertEquals(totalBadbefore + 1,
+        metrics.getMultipleAppsFailedRetrieved());
+  }
+
   // Records failures for all calls
   private class MockBadSubCluster {
     public void getNewApplication() {
@@ -217,6 +256,11 @@ public class TestRouterMetrics {
       LOG.info("Mocked: failed getApplicationReport call");
       metrics.incrAppsFailedRetrieved();
     }
+
+    public void getApplicationsReport() {
+      LOG.info("Mocked: failed getApplicationsReport call");
+      metrics.incrMultipleAppsFailedRetrieved();
+    }
   }
 
   // Records successes for all calls
@@ -244,5 +288,11 @@ public class TestRouterMetrics {
           duration);
       metrics.succeededAppsRetrieved(duration);
     }
+
+    public void getApplicationsReport(long duration) {
+      LOG.info("Mocked: successful getApplicationsReport call with duration {}",
+          duration);
+      metrics.succeededMultipleAppsRetrieved(duration);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
index 91e601e..93527e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -18,26 +18,32 @@
 
 package org.apache.hadoop.yarn.server.router.webapp;
 
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceInfo;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.core.HttpHeaders;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
 /**
  * This class mocks the RESTRequestInterceptor.
  */
@@ -101,6 +107,27 @@ public class MockDefaultRequestInterceptorREST
   }
 
   @Override
+  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+      Set<String> statesQuery, String finalStatusQuery, String userQuery,
+      String queueQuery, String count, String startedBegin, String startedEnd,
+      String finishBegin, String finishEnd, Set<String> applicationTypes,
+      Set<String> applicationTags, Set<String> unselectedFields) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+    AppsInfo appsInfo = new AppsInfo();
+    AppInfo appInfo = new AppInfo();
+
+    appInfo.setAppId(
+        ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+            applicationCounter.incrementAndGet()).toString());
+    appInfo.setAMHostHttpAddress("http://i_am_the_AM:1234");
+
+    appsInfo.add(appInfo);
+    return appsInfo;
+  }
+
+  @Override
   public Response updateAppState(AppState targetState, HttpServletRequest hsr,
       String appId) throws AuthorizationException, YarnException,
       InterruptedException, IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index fb6cdd8..2ee62af 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
 import org.junit.Assert;
 import org.junit.Test;
@@ -374,4 +375,20 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     Assert.assertNull(response);
   }
 
+  /**
+   * This test validates the correctness of GetApplicationsReport in case each
+   * subcluster provided one application.
+   */
+  @Test
+  public void testGetApplicationsReport()
+      throws YarnException, IOException, InterruptedException {
+
+    AppsInfo responseGet = interceptor.getApps(null, null, null, null, null,
+        null, null, null, null, null, null, null, null, null);
+
+    Assert.assertNotNull(responseGet);
+    Assert.assertEquals(NUM_SUBCLUSTER, responseGet.getApps().size());
+    // The merged operations will be tested in TestRouterWebServiceUtil
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index 48bc1a8..38b1027 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
 import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
 import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
@@ -271,4 +272,48 @@ public class TestFederationInterceptorRESTRetry
             .getApplicationHomeSubCluster().getHomeSubCluster());
   }
 
+  /**
+   * This test validates the correctness of GetApps in case the cluster is
+   * composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetAppsOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+        null, null, null, null, null, null, null, null);
+    Assert.assertNull(response);
+  }
+
+  /**
+   * This test validates the correctness of GetApps in case the cluster is
+   * composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetAppsTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+        null, null, null, null, null, null, null, null);
+    Assert.assertNull(response);
+  }
+
+  /**
+   * This test validates the correctness of GetApps in case the cluster is
+   * composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testGetAppsOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(good, bad2));
+
+    AppsInfo response = interceptor.getApps(null, null, null, null, null, null,
+        null, null, null, null, null, null, null, null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(1, response.getApps().size());
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/88b32edb/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
new file mode 100644
index 0000000..810432a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestRouterWebServiceUtil.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ResourceRequestInfo;
+import org.apache.hadoop.yarn.server.uam.UnmanagedApplicationManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test class to validate RouterWebServiceUtil methods.
+ */
+public class TestRouterWebServiceUtil {
+
+  private static final ApplicationId APPID1 = ApplicationId.newInstance(1, 1);
+  private static final ApplicationId APPID2 = ApplicationId.newInstance(2, 1);
+  private static final ApplicationId APPID3 = ApplicationId.newInstance(3, 1);
+  private static final ApplicationId APPID4 = ApplicationId.newInstance(4, 1);
+
+  /**
+   * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+   * in case we want to merge 4 AMs. The expected result would be the same 4
+   * AMs.
+   */
+  @Test
+  public void testMerge4DifferentApps() {
+
+    AppsInfo apps = new AppsInfo();
+    int value = 1000;
+
+    AppInfo app1 = new AppInfo();
+    app1.setAppId(APPID1.toString());
+    app1.setAMHostHttpAddress("http://i_am_the_AM1:1234");
+    app1.setState(YarnApplicationState.FINISHED);
+    app1.setNumAMContainerPreempted(value);
+    apps.add(app1);
+
+    AppInfo app2 = new AppInfo();
+    app2.setAppId(APPID2.toString());
+    app2.setAMHostHttpAddress("http://i_am_the_AM2:1234");
+    app2.setState(YarnApplicationState.ACCEPTED);
+    app2.setAllocatedVCores(2 * value);
+
+    apps.add(app2);
+
+    AppInfo app3 = new AppInfo();
+    app3.setAppId(APPID3.toString());
+    app3.setAMHostHttpAddress("http://i_am_the_AM3:1234");
+    app3.setState(YarnApplicationState.RUNNING);
+    app3.setReservedMB(3 * value);
+    apps.add(app3);
+
+    AppInfo app4 = new AppInfo();
+    app4.setAppId(APPID4.toString());
+    app4.setAMHostHttpAddress("http://i_am_the_AM4:1234");
+    app4.setState(YarnApplicationState.NEW);
+    app4.setAllocatedMB(4 * value);
+    apps.add(app4);
+
+    AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(4, result.getApps().size());
+
+    List<String> appIds = new ArrayList<String>();
+    AppInfo appInfo1 = null, appInfo2 = null, appInfo3 = null, appInfo4 = null;
+    for (AppInfo app : result.getApps()) {
+      appIds.add(app.getAppId());
+      if (app.getAppId().equals(APPID1.toString())) {
+        appInfo1 = app;
+      }
+      if (app.getAppId().equals(APPID2.toString())) {
+        appInfo2 = app;
+      }
+      if (app.getAppId().equals(APPID3.toString())) {
+        appInfo3 = app;
+      }
+      if (app.getAppId().equals(APPID4.toString())) {
+        appInfo4 = app;
+      }
+    }
+
+    Assert.assertTrue(appIds.contains(APPID1.toString()));
+    Assert.assertTrue(appIds.contains(APPID2.toString()));
+    Assert.assertTrue(appIds.contains(APPID3.toString()));
+    Assert.assertTrue(appIds.contains(APPID4.toString()));
+
+    // Check preservations APP1
+    Assert.assertEquals(app1.getState(), appInfo1.getState());
+    Assert.assertEquals(app1.getNumAMContainerPreempted(),
+        appInfo1.getNumAMContainerPreempted());
+
+    // Check preservations APP2
+    Assert.assertEquals(app2.getState(), appInfo2.getState());
+    Assert.assertEquals(app3.getAllocatedVCores(),
+        appInfo3.getAllocatedVCores());
+
+    // Check preservations APP3
+    Assert.assertEquals(app3.getState(), appInfo3.getState());
+    Assert.assertEquals(app3.getReservedMB(), appInfo3.getReservedMB());
+
+    // Check preservations APP3
+    Assert.assertEquals(app4.getState(), appInfo4.getState());
+    Assert.assertEquals(app3.getAllocatedMB(), appInfo3.getAllocatedMB());
+  }
+
+  /**
+   * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+   * in case we want to merge 2 UAMs and their own AM. The status of the AM is
+   * FINISHED, so we check the correctness of the merging of the historical
+   * values. The expected result would be 1 report with the merged information.
+   */
+  @Test
+  public void testMergeAppsFinished() {
+
+    AppsInfo apps = new AppsInfo();
+
+    String amHost = "http://i_am_the_AM1:1234";
+    AppInfo am = new AppInfo();
+    am.setAppId(APPID1.toString());
+    am.setAMHostHttpAddress(amHost);
+    am.setState(YarnApplicationState.FINISHED);
+
+    int value = 1000;
+    setAppInfoFinished(am, value);
+
+    apps.add(am);
+
+    AppInfo uam1 = new AppInfo();
+    uam1.setAppId(APPID1.toString());
+    apps.add(uam1);
+
+    setAppInfoFinished(uam1, value);
+
+    AppInfo uam2 = new AppInfo();
+    uam2.setAppId(APPID1.toString());
+    apps.add(uam2);
+
+    setAppInfoFinished(uam2, value);
+
+    // in this case the result does not change if we enable partial result
+    AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(1, result.getApps().size());
+
+    AppInfo app = result.getApps().get(0);
+
+    Assert.assertEquals(APPID1.toString(), app.getAppId());
+    Assert.assertEquals(amHost, app.getAMHostHttpAddress());
+    Assert.assertEquals(value * 3, app.getPreemptedResourceMB());
+    Assert.assertEquals(value * 3, app.getPreemptedResourceVCores());
+    Assert.assertEquals(value * 3, app.getNumNonAMContainerPreempted());
+    Assert.assertEquals(value * 3, app.getNumAMContainerPreempted());
+    Assert.assertEquals(value * 3, app.getPreemptedMemorySeconds());
+    Assert.assertEquals(value * 3, app.getPreemptedVcoreSeconds());
+  }
+
+  private void setAppInfoFinished(AppInfo am, int value) {
+    am.setPreemptedResourceMB(value);
+    am.setPreemptedResourceVCores(value);
+    am.setNumNonAMContainerPreempted(value);
+    am.setNumAMContainerPreempted(value);
+    am.setPreemptedMemorySeconds(value);
+    am.setPreemptedVcoreSeconds(value);
+  }
+
+  /**
+   * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+   * in case we want to merge 2 UAMs and their own AM. The status of the AM is
+   * RUNNING, so we check the correctness of the merging of the runtime values.
+   * The expected result would be 1 report with the merged information.
+   */
+  @Test
+  public void testMergeAppsRunning() {
+
+    AppsInfo apps = new AppsInfo();
+
+    String amHost = "http://i_am_the_AM2:1234";
+    AppInfo am = new AppInfo();
+    am.setAppId(APPID2.toString());
+    am.setAMHostHttpAddress(amHost);
+    am.setState(YarnApplicationState.RUNNING);
+
+    int value = 1000;
+    setAppInfoRunning(am, value);
+
+    apps.add(am);
+
+    AppInfo uam1 = new AppInfo();
+    uam1.setAppId(APPID2.toString());
+    uam1.setState(YarnApplicationState.RUNNING);
+    apps.add(uam1);
+
+    setAppInfoRunning(uam1, value);
+
+    AppInfo uam2 = new AppInfo();
+    uam2.setAppId(APPID2.toString());
+    uam2.setState(YarnApplicationState.RUNNING);
+    apps.add(uam2);
+
+    setAppInfoRunning(uam2, value);
+
+    // in this case the result does not change if we enable partial result
+    AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(1, result.getApps().size());
+
+    AppInfo app = result.getApps().get(0);
+
+    Assert.assertEquals(APPID2.toString(), app.getAppId());
+    Assert.assertEquals(amHost, app.getAMHostHttpAddress());
+    Assert.assertEquals(value * 3, app.getAllocatedMB());
+    Assert.assertEquals(value * 3, app.getAllocatedVCores());
+    Assert.assertEquals(value * 3, app.getReservedMB());
+    Assert.assertEquals(value * 3, app.getReservedVCores());
+    Assert.assertEquals(value * 3, app.getRunningContainers());
+    Assert.assertEquals(value * 3, app.getMemorySeconds());
+    Assert.assertEquals(value * 3, app.getVcoreSeconds());
+    Assert.assertEquals(3, app.getResourceRequests().size());
+  }
+
+  private void setAppInfoRunning(AppInfo am, int value) {
+    am.getResourceRequests().add(new ResourceRequestInfo());
+    am.setAllocatedMB(value);
+    am.setAllocatedVCores(value);
+    am.setReservedMB(value);
+    am.setReservedVCores(value);
+    am.setRunningContainers(value);
+    am.setMemorySeconds(value);
+    am.setVcoreSeconds(value);
+  }
+
+  /**
+   * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+   * in case we want to merge 2 UAMs without their own AM. The expected result
+   * would be an empty report or a partial report of the 2 UAMs depending on the
+   * selected policy.
+   */
+  @Test
+  public void testMerge2UAM() {
+
+    AppsInfo apps = new AppsInfo();
+
+    AppInfo app1 = new AppInfo();
+    app1.setAppId(APPID1.toString());
+    app1.setName(UnmanagedApplicationManager.APP_NAME);
+    app1.setState(YarnApplicationState.RUNNING);
+    apps.add(app1);
+
+    AppInfo app2 = new AppInfo();
+    app2.setAppId(APPID1.toString());
+    app2.setName(UnmanagedApplicationManager.APP_NAME);
+    app2.setState(YarnApplicationState.RUNNING);
+    apps.add(app2);
+
+    AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(0, result.getApps().size());
+
+    // By enabling partial result, the expected result would be a partial report
+    // of the 2 UAMs
+    AppsInfo result2 = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), true);
+    Assert.assertNotNull(result2);
+    Assert.assertEquals(1, result2.getApps().size());
+    Assert.assertEquals(YarnApplicationState.RUNNING,
+        result2.getApps().get(0).getState());
+  }
+
+  /**
+   * This test validates the correctness of RouterWebServiceUtil#mergeAppsInfo
+   * in case we want to merge 1 UAM that does not depend on Federation. The
+   * excepted result would be the same app report.
+   */
+  @Test
+  public void testMergeUAM() {
+
+    AppsInfo apps = new AppsInfo();
+
+    AppInfo app1 = new AppInfo();
+    app1.setAppId(APPID1.toString());
+    app1.setName("Test");
+    apps.add(app1);
+
+    // in this case the result does not change if we enable partial result
+    AppsInfo result = RouterWebServiceUtil.mergeAppsInfo(apps.getApps(), false);
+    Assert.assertNotNull(result);
+    Assert.assertEquals(1, result.getApps().size());
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[5/8] hadoop git commit: YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh)

Posted by cu...@apache.org.
YARN-5603. Metrics for Federation StateStore. (Ellen Hui via asuresh)

(cherry picked from commit 75abc9a8e2cf1c7d2c574ede720df59421512be3)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ac090b38
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ac090b38
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ac090b38

Branch: refs/heads/branch-2
Commit: ac090b38ad54f78f59ec2ec0f73c6c4d7664d4cb
Parents: 261f769
Author: Arun Suresh <as...@apache.org>
Authored: Mon Aug 21 22:43:08 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:30 2017 -0700

----------------------------------------------------------------------
 .../store/impl/SQLFederationStateStore.java     |  79 ++++++++
 .../FederationStateStoreClientMetrics.java      | 184 +++++++++++++++++++
 .../federation/store/metrics/package-info.java  |  17 ++
 .../TestFederationStateStoreClientMetrics.java  | 146 +++++++++++++++
 4 files changed, 426 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
index 63d8e42..533f9c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/SQLFederationStateStore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -72,6 +73,8 @@ import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembership
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
 import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
 import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +140,7 @@ public class SQLFederationStateStore implements FederationStateStore {
   private String url;
   private int maximumPoolSize;
   private HikariDataSource dataSource = null;
+  private final Clock clock = new MonotonicClock();
 
   @Override
   public void init(Configuration conf) throws YarnException {
@@ -203,7 +207,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(9, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not add a new subcluster into FederationStateStore
@@ -222,8 +228,11 @@ public class SQLFederationStateStore implements FederationStateStore {
 
       LOG.info(
           "Registered the SubCluster " + subClusterId + " into the StateStore");
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to register the SubCluster " + subClusterId
               + " into the StateStore",
@@ -260,7 +269,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not deregister the subcluster into FederationStateStore
@@ -278,8 +289,11 @@ public class SQLFederationStateStore implements FederationStateStore {
 
       LOG.info("Deregistered the SubCluster " + subClusterId + " state to "
           + state.toString());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to deregister the sub-cluster " + subClusterId + " state to "
               + state.toString(),
@@ -317,7 +331,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not update the subcluster into FederationStateStore
@@ -336,8 +352,11 @@ public class SQLFederationStateStore implements FederationStateStore {
 
       LOG.info("Heartbeated the StateStore for the specified SubCluster "
           + subClusterId);
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to heartbeat the StateStore for the specified SubCluster "
               + subClusterId,
@@ -378,7 +397,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(9, java.sql.Types.VARCHAR);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.execute();
+      long stopTime = clock.getTime();
 
       String amRMAddress = cstmt.getString(2);
       String clientRMAddress = cstmt.getString(3);
@@ -403,6 +424,9 @@ public class SQLFederationStateStore implements FederationStateStore {
           clientRMAddress, rmAdminAddress, webAppAddress, lastHeartBeat, state,
           lastStartTime, capability);
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
       // Check if the output it is a valid subcluster
       try {
         FederationMembershipStateStoreInputValidator
@@ -417,6 +441,7 @@ public class SQLFederationStateStore implements FederationStateStore {
             + subClusterInfo.toString());
       }
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the SubCluster information for " + subClusterId, e);
     } finally {
@@ -439,7 +464,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_SUBCLUSTERS);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -459,6 +486,10 @@ public class SQLFederationStateStore implements FederationStateStore {
             amRMAddress, clientRMAddress, rmAdminAddress, webAppAddress,
             lastHeartBeat, state, lastStartTime, capability);
 
+        FederationStateStoreClientMetrics
+            .succeededStateStoreCall(stopTime - startTime);
+
+
         // Check if the output it is a valid subcluster
         try {
           FederationMembershipStateStoreInputValidator
@@ -477,6 +508,7 @@ public class SQLFederationStateStore implements FederationStateStore {
       }
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the SubClusters ", e);
     } finally {
@@ -513,11 +545,16 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       subClusterHome = cstmt.getString(3);
       SubClusterId subClusterIdHome = SubClusterId.newInstance(subClusterHome);
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
       // For failover reason, we check the returned SubClusterId.
       // If it is equal to the subclusterId we sent, the call added the new
       // application into FederationStateStore. If the call returns a different
@@ -554,6 +591,7 @@ public class SQLFederationStateStore implements FederationStateStore {
       }
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils
           .logAndThrowRetriableException(LOG,
               "Unable to insert the newly generated application "
@@ -592,7 +630,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not update the application into FederationStateStore
@@ -611,8 +651,11 @@ public class SQLFederationStateStore implements FederationStateStore {
       LOG.info(
           "Update the SubCluster to {} for application {} in the StateStore",
           subClusterId, appId);
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils
           .logAndThrowRetriableException(LOG,
               "Unable to update the application "
@@ -645,7 +688,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(2, java.sql.Types.VARCHAR);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.execute();
+      long stopTime = clock.getTime();
 
       if (cstmt.getString(2) != null) {
         homeRM = SubClusterId.newInstance(cstmt.getString(2));
@@ -659,7 +704,12 @@ public class SQLFederationStateStore implements FederationStateStore {
         LOG.debug("Got the information about the specified application  "
             + request.getApplicationId() + ". The AM is running in " + homeRM);
       }
+
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the application information "
               + "for the specified application " + request.getApplicationId(),
@@ -688,7 +738,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_APPLICATIONS_HOME_SUBCLUSTER);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -701,7 +753,11 @@ public class SQLFederationStateStore implements FederationStateStore {
             SubClusterId.newInstance(homeSubCluster)));
       }
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the information for all the applications ", e);
     } finally {
@@ -731,7 +787,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(2, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not delete the application from FederationStateStore
@@ -750,8 +808,11 @@ public class SQLFederationStateStore implements FederationStateStore {
 
       LOG.info("Delete from the StateStore the application: {}",
           request.getApplicationId());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to delete the application " + request.getApplicationId(), e);
     } finally {
@@ -782,7 +843,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(3, java.sql.Types.VARBINARY);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check if the output it is a valid policy
       if (cstmt.getString(2) != null && cstmt.getBytes(3) != null) {
@@ -798,7 +861,11 @@ public class SQLFederationStateStore implements FederationStateStore {
         return null;
       }
 
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to select the policy for the queue :" + request.getQueue(),
           e);
@@ -833,7 +900,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt.registerOutParameter(4, java.sql.Types.INTEGER);
 
       // Execute the query
+      long startTime = clock.getTime();
       cstmt.executeUpdate();
+      long stopTime = clock.getTime();
 
       // Check the ROWCOUNT value, if it is equal to 0 it means the call
       // did not add a new policy into FederationStateStore
@@ -852,8 +921,11 @@ public class SQLFederationStateStore implements FederationStateStore {
 
       LOG.info("Insert into the state store the policy for the queue: "
           + policyConf.getQueue());
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
 
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to insert the newly generated policy for the queue :"
               + policyConf.getQueue(),
@@ -880,7 +952,9 @@ public class SQLFederationStateStore implements FederationStateStore {
       cstmt = conn.prepareCall(CALL_SP_GET_POLICIES_CONFIGURATIONS);
 
       // Execute the query
+      long startTime = clock.getTime();
       rs = cstmt.executeQuery();
+      long stopTime = clock.getTime();
 
       while (rs.next()) {
 
@@ -894,7 +968,12 @@ public class SQLFederationStateStore implements FederationStateStore {
                 ByteBuffer.wrap(policyInfo));
         policyConfigurations.add(subClusterPolicyConfiguration);
       }
+
+      FederationStateStoreClientMetrics
+          .succeededStateStoreCall(stopTime - startTime);
+
     } catch (SQLException e) {
+      FederationStateStoreClientMetrics.failedStateStoreCall();
       FederationStateStoreUtils.logAndThrowRetriableException(LOG,
           "Unable to obtain the policy information for all the queues.", e);
     } finally {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..27b46cd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/FederationStateStoreClientMetrics.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsSource;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Performance metrics for FederationStateStore implementations.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+@Metrics(about = "Performance and usage metrics for Federation StateStore",
+         context = "fedr")
+public final class FederationStateStoreClientMetrics implements MetricsSource {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(FederationStateStoreClientMetrics.class);
+
+  private static final MetricsRegistry REGISTRY =
+      new MetricsRegistry("FederationStateStoreClientMetrics");
+  private final static Method[] STATESTORE_API_METHODS =
+      FederationStateStore.class.getMethods();
+
+  // Map method names to counter objects
+  private static final Map<String, MutableCounterLong> API_TO_FAILED_CALLS =
+      new HashMap<String, MutableCounterLong>();
+  private static final Map<String, MutableRate> API_TO_SUCCESSFUL_CALLS =
+      new HashMap<String, MutableRate>();
+
+  // Provide quantile latency for each api call.
+  private static final Map<String, MutableQuantiles> API_TO_QUANTILE_METRICS =
+      new HashMap<String, MutableQuantiles>();
+
+  // Error string templates for logging calls from methods not in
+  // FederationStateStore API
+  private static final String UNKOWN_FAIL_ERROR_MSG =
+      "Not recording failed call for unknown FederationStateStore method {}";
+  private static final String UNKNOWN_SUCCESS_ERROR_MSG =
+      "Not recording successful call for unknown "
+          + "FederationStateStore method {}";
+
+  // Aggregate metrics are shared, and don't have to be looked up per call
+  @Metric("Total number of successful calls and latency(ms)")
+  private static MutableRate totalSucceededCalls;
+
+  @Metric("Total number of failed StateStore calls")
+  private static MutableCounterLong totalFailedCalls;
+
+  // This after the static members are initialized, or the constructor will
+  // throw a NullPointerException
+  private static final FederationStateStoreClientMetrics S_INSTANCE =
+      DefaultMetricsSystem.instance()
+          .register(new FederationStateStoreClientMetrics());
+
+  synchronized public static FederationStateStoreClientMetrics getInstance() {
+    return S_INSTANCE;
+  }
+
+  private FederationStateStoreClientMetrics() {
+    // Create the metrics for each method and put them into the map
+    for (Method m : STATESTORE_API_METHODS) {
+      String methodName = m.getName();
+      LOG.debug("Registering Federation StateStore Client metrics for {}",
+          methodName);
+
+      // This metric only records the number of failed calls; it does not
+      // capture latency information
+      API_TO_FAILED_CALLS.put(methodName,
+          REGISTRY.newCounter(methodName + "_numFailedCalls",
+              "# failed calls to " + methodName, 0L));
+
+      // This metric records both the number and average latency of successful
+      // calls.
+      API_TO_SUCCESSFUL_CALLS.put(methodName,
+          REGISTRY.newRate(methodName + "_successfulCalls",
+              "# successful calls and latency(ms) for" + methodName));
+
+      // This metric records the quantile-based latency of each successful call,
+      // re-sampled every 10 seconds.
+      API_TO_QUANTILE_METRICS.put(methodName,
+          REGISTRY.newQuantiles(methodName + "Latency",
+              "Quantile latency (ms) for " + methodName, "ops", "latency", 10));
+    }
+  }
+
+  public static void failedStateStoreCall() {
+    String methodName =
+        Thread.currentThread().getStackTrace()[2].getMethodName();
+    MutableCounterLong methodMetric = API_TO_FAILED_CALLS.get(methodName);
+    if (methodMetric == null) {
+      LOG.error(UNKOWN_FAIL_ERROR_MSG, methodName);
+      return;
+    }
+
+    totalFailedCalls.incr();
+    methodMetric.incr();
+  }
+
+  public static void succeededStateStoreCall(long duration) {
+    String methodName =
+        Thread.currentThread().getStackTrace()[2].getMethodName();
+    MutableRate methodMetric = API_TO_SUCCESSFUL_CALLS.get(methodName);
+    MutableQuantiles methodQuantileMetric =
+        API_TO_QUANTILE_METRICS.get(methodName);
+    if (methodMetric == null || methodQuantileMetric == null) {
+      LOG.error(UNKNOWN_SUCCESS_ERROR_MSG, methodName);
+      return;
+    }
+
+    totalSucceededCalls.add(duration);
+    methodMetric.add(duration);
+    methodQuantileMetric.add(duration);
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    REGISTRY.snapshot(collector.addRecord(REGISTRY.info()), all);
+  }
+
+  // Getters for unit testing
+  @VisibleForTesting
+  static long getNumFailedCallsForMethod(String methodName) {
+    return API_TO_FAILED_CALLS.get(methodName).value();
+  }
+
+  @VisibleForTesting
+  static long getNumSucceessfulCallsForMethod(String methodName) {
+    return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  static double getLatencySucceessfulCallsForMethod(String methodName) {
+    return API_TO_SUCCESSFUL_CALLS.get(methodName).lastStat().mean();
+  }
+
+  @VisibleForTesting
+  static long getNumFailedCalls() {
+    return totalFailedCalls.value();
+  }
+
+  @VisibleForTesting
+  static long getNumSucceededCalls() {
+    return totalSucceededCalls.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  static double getLatencySucceededCalls() {
+    return totalSucceededCalls.lastStat().mean();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
new file mode 100644
index 0000000..eb548f4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/metrics/package-info.java
@@ -0,0 +1,17 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.store.metrics;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ac090b38/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
new file mode 100644
index 0000000..241d5e2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/metrics/TestFederationStateStoreClientMetrics.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.metrics;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unittests for {@link FederationStateStoreClientMetrics}.
+ *
+ */
+public class TestFederationStateStoreClientMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationStateStoreClientMetrics.class);
+
+  private MockBadFederationStateStore badStateStore =
+      new MockBadFederationStateStore();
+  private MockGoodFederationStateStore goodStateStore =
+      new MockGoodFederationStateStore();
+
+  @Test
+  public void testAggregateMetricInit() {
+    LOG.info("Test: aggregate metrics are initialized correctly");
+
+    Assert.assertEquals(0,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(0,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+
+    LOG.info("Test: aggregate metrics are updated correctly");
+  }
+
+  @Test
+  public void testSuccessfulCalls() {
+    LOG.info("Test: Aggregate and method successful calls updated correctly");
+
+    long totalGoodBefore =
+        FederationStateStoreClientMetrics.getNumSucceededCalls();
+    long apiGoodBefore = FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+    goodStateStore.registerSubCluster(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(100,
+        FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+    Assert.assertEquals(apiGoodBefore + 1,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(100, FederationStateStoreClientMetrics
+        .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+    LOG.info("Test: Running stats correctly calculated for 2 metrics");
+
+    goodStateStore.registerSubCluster(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(150,
+        FederationStateStoreClientMetrics.getLatencySucceededCalls(), 0);
+    Assert.assertEquals(apiGoodBefore + 2,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(150, FederationStateStoreClientMetrics
+        .getLatencySucceessfulCallsForMethod("registerSubCluster"), 0);
+
+  }
+
+  @Test
+  public void testFailedCalls() {
+
+    long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+    long apiBadBefore = FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster");
+
+    badStateStore.registerSubCluster();
+
+    LOG.info("Test: Aggregate and method failed calls updated correctly");
+    Assert.assertEquals(totalBadbefore + 1,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+    Assert.assertEquals(apiBadBefore + 1, FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster"));
+
+  }
+
+  @Test
+  public void testCallsUnknownMethod() {
+
+    long totalBadbefore = FederationStateStoreClientMetrics.getNumFailedCalls();
+    long apiBadBefore = FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster");
+    long totalGoodBefore =
+        FederationStateStoreClientMetrics.getNumSucceededCalls();
+    long apiGoodBefore = FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster");
+
+    LOG.info("Calling Metrics class directly");
+    FederationStateStoreClientMetrics.failedStateStoreCall();
+    FederationStateStoreClientMetrics.succeededStateStoreCall(100);
+
+    LOG.info("Test: Aggregate and method calls did not update");
+    Assert.assertEquals(totalBadbefore,
+        FederationStateStoreClientMetrics.getNumFailedCalls());
+    Assert.assertEquals(apiBadBefore, FederationStateStoreClientMetrics
+        .getNumFailedCallsForMethod("registerSubCluster"));
+
+    Assert.assertEquals(totalGoodBefore,
+        FederationStateStoreClientMetrics.getNumSucceededCalls());
+    Assert.assertEquals(apiGoodBefore, FederationStateStoreClientMetrics
+        .getNumSucceessfulCallsForMethod("registerSubCluster"));
+
+  }
+
+  // Records failures for all calls
+  private class MockBadFederationStateStore {
+    public void registerSubCluster() {
+      LOG.info("Mocked: failed registerSubCluster call");
+      FederationStateStoreClientMetrics.failedStateStoreCall();
+    }
+  }
+
+  // Records successes for all calls
+  private class MockGoodFederationStateStore {
+    public void registerSubCluster(long duration) {
+      LOG.info("Mocked: successful registerSubCluster call with duration {}",
+          duration);
+      FederationStateStoreClientMetrics.succeededStateStoreCall(duration);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[4/8] hadoop git commit: YARN-6900. ZooKeeper based implementation of the FederationStateStore. (Íñigo Goiri via Subru).

Posted by cu...@apache.org.
YARN-6900. ZooKeeper based implementation of the FederationStateStore. (Íñigo Goiri via Subru).

(cherry picked from commit de462da04e167a04b89ecf0f40d464cf39dc6549)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/261f769d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/261f769d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/261f769d

Branch: refs/heads/branch-2
Commit: 261f769d797b61839b40873f0df13aa58e86f3f9
Parents: 9ad067e
Author: Subru Krishnan <su...@apache.org>
Authored: Wed Aug 16 11:43:24 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:23 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   8 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   4 +
 .../hadoop-yarn-server-common/pom.xml           |   5 +
 .../impl/ZookeeperFederationStateStore.java     | 634 +++++++++++++++++++
 .../impl/TestZookeeperFederationStateStore.java |  89 +++
 .../TestFederationStateStoreFacadeRetry.java    |  20 +-
 .../src/site/markdown/Federation.md             |  56 +-
 7 files changed, 785 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index c34c076..bf18ade 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2561,6 +2561,14 @@ public class YarnConfiguration extends Configuration {
 
   public static final String DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS = "";
 
+  public static final String FEDERATION_STATESTORE_ZK_PREFIX =
+      FEDERATION_PREFIX + "zk-state-store.";
+  /** Parent znode path under which ZKRMStateStore will create znodes. */
+  public static final String FEDERATION_STATESTORE_ZK_PARENT_PATH =
+      FEDERATION_STATESTORE_ZK_PREFIX + "parent-path";
+  public static final String DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH =
+      "/federationstore";
+
   private static final String FEDERATION_STATESTORE_SQL_PREFIX =
       FEDERATION_PREFIX + "state-store.sql.";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index d6c619d..ad38051 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -98,6 +98,10 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPropsToSkipCompare
         .add(YarnConfiguration.DEFAULT_FEDERATION_POLICY_MANAGER_PARAMS);
 
+    // Federation StateStore ZK implementation configs to be ignored
+    configurationPropsToSkipCompare.add(
+        YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH);
+
     // Federation StateStore SQL implementation configs to be ignored
     configurationPropsToSkipCompare
         .add(YarnConfiguration.FEDERATION_STATESTORE_SQL_JDBC_CLASS);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 8c754fa..f484e35 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -137,6 +137,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
new file mode 100644
index 0000000..6ae7d3c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
@@ -0,0 +1,634 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.List;
+import java.util.TimeZone;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.curator.ZKCuratorManager;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterIdProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterInfoProto;
+import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.SubClusterPolicyConfigurationProto;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
+import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ZooKeeper implementation of {@link FederationStateStore}.
+ *
+ * The znode structure is as follows:
+ * ROOT_DIR_PATH
+ * |--- MEMBERSHIP
+ * |     |----- SC1
+ * |     |----- SC2
+ * |--- APPLICATION
+ * |     |----- APP1
+ * |     |----- APP2
+ * |--- POLICY
+ *       |----- QUEUE1
+ *       |----- QUEUE1
+ */
+public class ZookeeperFederationStateStore implements FederationStateStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ZookeeperFederationStateStore.class);
+
+  private final static String ROOT_ZNODE_NAME_MEMBERSHIP = "memberships";
+  private final static String ROOT_ZNODE_NAME_APPLICATION = "applications";
+  private final static String ROOT_ZNODE_NAME_POLICY = "policies";
+
+  /** Interface to Zookeeper. */
+  private ZKCuratorManager zkManager;
+
+  /** Directory to store the state store data. */
+  private String baseZNode;
+
+  private String appsZNode;
+  private String membershipZNode;
+  private String policiesZNode;
+
+  @Override
+  public void init(Configuration conf) throws YarnException {
+    LOG.info("Initializing ZooKeeper connection");
+
+    baseZNode = conf.get(
+        YarnConfiguration.FEDERATION_STATESTORE_ZK_PARENT_PATH,
+        YarnConfiguration.DEFAULT_FEDERATION_STATESTORE_ZK_PARENT_PATH);
+    try {
+      this.zkManager = new ZKCuratorManager(conf);
+      this.zkManager.start();
+    } catch (IOException e) {
+      LOG.error("Cannot initialize the ZK connection", e);
+    }
+
+    // Base znodes
+    membershipZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_MEMBERSHIP);
+    appsZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_APPLICATION);
+    policiesZNode = getNodePath(baseZNode, ROOT_ZNODE_NAME_POLICY);
+
+    // Create base znode for each entity
+    try {
+      zkManager.createRootDirRecursively(membershipZNode);
+      zkManager.createRootDirRecursively(appsZNode);
+      zkManager.createRootDirRecursively(policiesZNode);
+    } catch (Exception e) {
+      String errMsg = "Cannot create base directories: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+  }
+
+  @Override
+  public void close() throws Exception {
+    if (zkManager != null) {
+      zkManager.close();
+    }
+  }
+
+  @Override
+  public AddApplicationHomeSubClusterResponse addApplicationHomeSubCluster(
+      AddApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+    ApplicationId appId = app.getApplicationId();
+
+    // Try to write the subcluster
+    SubClusterId homeSubCluster = app.getHomeSubCluster();
+    try {
+      putApp(appId, homeSubCluster, false);
+    } catch (Exception e) {
+      String errMsg = "Cannot add application home subcluster for " + appId;
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    // Check for the actual subcluster
+    try {
+      homeSubCluster = getApp(appId);
+    } catch (Exception e) {
+      String errMsg = "Cannot check app home subcluster for " + appId;
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    return AddApplicationHomeSubClusterResponse
+        .newInstance(homeSubCluster);
+  }
+
+  @Override
+  public UpdateApplicationHomeSubClusterResponse
+      updateApplicationHomeSubCluster(
+          UpdateApplicationHomeSubClusterRequest request)
+              throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+    ApplicationHomeSubCluster app = request.getApplicationHomeSubCluster();
+    ApplicationId appId = app.getApplicationId();
+    SubClusterId homeSubCluster = getApp(appId);
+    if (homeSubCluster == null) {
+      String errMsg = "Application " + appId + " does not exist";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    SubClusterId newSubClusterId =
+        request.getApplicationHomeSubCluster().getHomeSubCluster();
+    putApp(appId, newSubClusterId, true);
+    return UpdateApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public GetApplicationHomeSubClusterResponse getApplicationHomeSubCluster(
+      GetApplicationHomeSubClusterRequest request) throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+    ApplicationId appId = request.getApplicationId();
+    SubClusterId homeSubCluster = getApp(appId);
+    if (homeSubCluster == null) {
+      String errMsg = "Application " + appId + " does not exist";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return GetApplicationHomeSubClusterResponse.newInstance(
+        ApplicationHomeSubCluster.newInstance(appId, homeSubCluster));
+  }
+
+  @Override
+  public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubCluster(
+      GetApplicationsHomeSubClusterRequest request) throws YarnException {
+    List<ApplicationHomeSubCluster> result = new ArrayList<>();
+
+    try {
+      for (String child : zkManager.getChildren(appsZNode)) {
+        ApplicationId appId = ApplicationId.fromString(child);
+        SubClusterId homeSubCluster = getApp(appId);
+        ApplicationHomeSubCluster app =
+            ApplicationHomeSubCluster.newInstance(appId, homeSubCluster);
+        result.add(app);
+      }
+    } catch (Exception e) {
+      String errMsg = "Cannot get apps: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    return GetApplicationsHomeSubClusterResponse.newInstance(result);
+  }
+
+  @Override
+  public DeleteApplicationHomeSubClusterResponse
+      deleteApplicationHomeSubCluster(
+          DeleteApplicationHomeSubClusterRequest request)
+              throws YarnException {
+
+    FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+    ApplicationId appId = request.getApplicationId();
+    String appZNode = getNodePath(appsZNode, appId.toString());
+
+    boolean exists = false;
+    try {
+      exists = zkManager.exists(appZNode);
+    } catch (Exception e) {
+      String errMsg = "Cannot check app: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    if (!exists) {
+      String errMsg = "Application " + appId + " does not exist";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    try {
+      zkManager.delete(appZNode);
+    } catch (Exception e) {
+      String errMsg = "Cannot delete app: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    return DeleteApplicationHomeSubClusterResponse.newInstance();
+  }
+
+  @Override
+  public SubClusterRegisterResponse registerSubCluster(
+      SubClusterRegisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator.validate(request);
+    SubClusterInfo subClusterInfo = request.getSubClusterInfo();
+    SubClusterId subclusterId = subClusterInfo.getSubClusterId();
+
+    // Update the heartbeat time
+    long currentTime = getCurrentTime();
+    subClusterInfo.setLastHeartBeat(currentTime);
+
+    try {
+      putSubclusterInfo(subclusterId, subClusterInfo, true);
+    } catch (Exception e) {
+      String errMsg = "Cannot register subcluster: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return SubClusterRegisterResponse.newInstance();
+  }
+
+  @Override
+  public SubClusterDeregisterResponse deregisterSubCluster(
+      SubClusterDeregisterRequest request) throws YarnException {
+    FederationMembershipStateStoreInputValidator.validate(request);
+    SubClusterId subClusterId = request.getSubClusterId();
+    SubClusterState state = request.getState();
+
+    // Get the current information and update it
+    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+    if (subClusterInfo == null) {
+      String errMsg = "SubCluster " + subClusterId + " not found";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    } else {
+      subClusterInfo.setState(state);
+      putSubclusterInfo(subClusterId, subClusterInfo, true);
+    }
+
+    return SubClusterDeregisterResponse.newInstance();
+  }
+
+  @Override
+  public SubClusterHeartbeatResponse subClusterHeartbeat(
+      SubClusterHeartbeatRequest request) throws YarnException {
+
+    FederationMembershipStateStoreInputValidator.validate(request);
+    SubClusterId subClusterId = request.getSubClusterId();
+
+    SubClusterInfo subClusterInfo = getSubclusterInfo(subClusterId);
+    if (subClusterInfo == null) {
+      String errMsg = "SubCluster " + subClusterId
+          + " does not exist; cannot heartbeat";
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    long currentTime = getCurrentTime();
+    subClusterInfo.setLastHeartBeat(currentTime);
+    subClusterInfo.setState(request.getState());
+    subClusterInfo.setCapability(request.getCapability());
+
+    putSubclusterInfo(subClusterId, subClusterInfo, true);
+
+    return SubClusterHeartbeatResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterInfoResponse getSubCluster(
+      GetSubClusterInfoRequest request) throws YarnException {
+
+    FederationMembershipStateStoreInputValidator.validate(request);
+    SubClusterId subClusterId = request.getSubClusterId();
+    SubClusterInfo subClusterInfo = null;
+    try {
+      subClusterInfo = getSubclusterInfo(subClusterId);
+      if (subClusterInfo == null) {
+        LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
+        return null;
+      }
+    } catch (Exception e) {
+      String errMsg = "Cannot get subcluster: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return GetSubClusterInfoResponse.newInstance(subClusterInfo);
+  }
+
+  @Override
+  public GetSubClustersInfoResponse getSubClusters(
+      GetSubClustersInfoRequest request) throws YarnException {
+    List<SubClusterInfo> result = new ArrayList<>();
+
+    try {
+      for (String child : zkManager.getChildren(membershipZNode)) {
+        SubClusterId subClusterId = SubClusterId.newInstance(child);
+        SubClusterInfo info = getSubclusterInfo(subClusterId);
+        if (!request.getFilterInactiveSubClusters() ||
+            info.getState().isActive()) {
+          result.add(info);
+        }
+      }
+    } catch (Exception e) {
+      String errMsg = "Cannot get subclusters: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return GetSubClustersInfoResponse.newInstance(result);
+  }
+
+
+  @Override
+  public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+      GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator.validate(request);
+    String queue = request.getQueue();
+    SubClusterPolicyConfiguration policy = null;
+    try {
+      policy = getPolicy(queue);
+    } catch (Exception e) {
+      String errMsg = "Cannot get policy: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+
+    if (policy == null) {
+      LOG.warn("Policy for queue: {} does not exist.", queue);
+      return null;
+    }
+    return GetSubClusterPolicyConfigurationResponse
+        .newInstance(policy);
+  }
+
+  @Override
+  public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+      SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+
+    FederationPolicyStoreInputValidator.validate(request);
+    SubClusterPolicyConfiguration policy =
+        request.getPolicyConfiguration();
+    try {
+      String queue = policy.getQueue();
+      putPolicy(queue, policy, true);
+    } catch (Exception e) {
+      String errMsg = "Cannot set policy: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return SetSubClusterPolicyConfigurationResponse.newInstance();
+  }
+
+  @Override
+  public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+      GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+    List<SubClusterPolicyConfiguration> result = new ArrayList<>();
+
+    try {
+      for (String child : zkManager.getChildren(policiesZNode)) {
+        SubClusterPolicyConfiguration policy = getPolicy(child);
+        result.add(policy);
+      }
+    } catch (Exception e) {
+      String errMsg = "Cannot get policies: " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+  }
+
+  @Override
+  public Version getCurrentVersion() {
+    return null;
+  }
+
+  @Override
+  public Version loadVersion() {
+    return null;
+  }
+
+  /**
+   * Get the subcluster for an application.
+   * @param appId Application identifier.
+   * @return Subcluster identifier.
+   * @throws Exception If it cannot contact ZooKeeper.
+   */
+  private SubClusterId getApp(final ApplicationId appId) throws YarnException {
+    String appZNode = getNodePath(appsZNode, appId.toString());
+
+    SubClusterId subClusterId = null;
+    byte[] data = get(appZNode);
+    if (data != null) {
+      try {
+        subClusterId = new SubClusterIdPBImpl(
+            SubClusterIdProto.parseFrom(data));
+      } catch (InvalidProtocolBufferException e) {
+        String errMsg = "Cannot parse application at " + appZNode;
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+    }
+    return subClusterId;
+  }
+
+  /**
+   * Put an application.
+   * @param appId Application identifier.
+   * @param subClusterId Subcluster identifier.
+   * @throws Exception If it cannot contact ZooKeeper.
+   */
+  private void putApp(final ApplicationId appId,
+      final SubClusterId subClusterId, boolean update)
+          throws YarnException {
+    String appZNode = getNodePath(appsZNode, appId.toString());
+    SubClusterIdProto proto =
+        ((SubClusterIdPBImpl)subClusterId).getProto();
+    byte[] data = proto.toByteArray();
+    put(appZNode, data, update);
+  }
+
+  /**
+   * Get the current information for a subcluster from Zookeeper.
+   * @param subclusterId Subcluster identifier.
+   * @return Subcluster information or null if it doesn't exist.
+   * @throws Exception If it cannot contact ZooKeeper.
+   */
+  private SubClusterInfo getSubclusterInfo(final SubClusterId subclusterId)
+      throws YarnException {
+    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+
+    SubClusterInfo policy = null;
+    byte[] data = get(memberZNode);
+    if (data != null) {
+      try {
+        policy = new SubClusterInfoPBImpl(
+            SubClusterInfoProto.parseFrom(data));
+      } catch (InvalidProtocolBufferException e) {
+        String errMsg = "Cannot parse subcluster info at " + memberZNode;
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+    }
+    return policy;
+  }
+
+  /**
+   * Put the subcluster information in Zookeeper.
+   * @param subclusterId Subcluster identifier.
+   * @param subClusterInfo Subcluster information.
+   * @throws Exception If it cannot contact ZooKeeper.
+   */
+  private void putSubclusterInfo(final SubClusterId subclusterId,
+      final SubClusterInfo subClusterInfo, final boolean update)
+          throws YarnException {
+    String memberZNode = getNodePath(membershipZNode, subclusterId.toString());
+    SubClusterInfoProto proto =
+        ((SubClusterInfoPBImpl)subClusterInfo).getProto();
+    byte[] data = proto.toByteArray();
+    put(memberZNode, data, update);
+  }
+
+  /**
+   * Get the queue policy from Zookeeper.
+   * @param queue Name of the queue.
+   * @return Subcluster policy configuration.
+   * @throws YarnException If it cannot contact ZooKeeper.
+   */
+  private SubClusterPolicyConfiguration getPolicy(final String queue)
+      throws YarnException {
+    String policyZNode = getNodePath(policiesZNode, queue);
+
+    SubClusterPolicyConfiguration policy = null;
+    byte[] data = get(policyZNode);
+    if (data != null) {
+      try {
+        policy = new SubClusterPolicyConfigurationPBImpl(
+            SubClusterPolicyConfigurationProto.parseFrom(data));
+      } catch (InvalidProtocolBufferException e) {
+        String errMsg = "Cannot parse policy at " + policyZNode;
+        FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      }
+    }
+    return policy;
+  }
+
+  /**
+   * Put the subcluster information in Zookeeper.
+   * @param queue Name of the queue.
+   * @param policy Subcluster policy configuration.
+   * @throws YarnException If it cannot contact ZooKeeper.
+   */
+  private void putPolicy(final String queue,
+      final SubClusterPolicyConfiguration policy, boolean update)
+          throws YarnException {
+    String policyZNode = getNodePath(policiesZNode, queue);
+
+    SubClusterPolicyConfigurationProto proto =
+        ((SubClusterPolicyConfigurationPBImpl)policy).getProto();
+    byte[] data = proto.toByteArray();
+    put(policyZNode, data, update);
+  }
+
+  /**
+   * Get data from a znode in Zookeeper.
+   * @param znode Path of the znode.
+   * @return Data in the znode.
+   * @throws YarnException If it cannot contact ZooKeeper.
+   */
+  private byte[] get(String znode) throws YarnException {
+    boolean exists = false;
+    try {
+      exists = zkManager.exists(znode);
+    } catch (Exception e) {
+      String errMsg = "Cannot find znode " + znode;
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    if (!exists) {
+      LOG.error("{} does not exist", znode);
+      return null;
+    }
+
+    byte[] data = null;
+    try {
+      data = zkManager.getData(znode);
+    } catch (Exception e) {
+      String errMsg = "Cannot get data from znode " + znode
+          + ": " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    return data;
+  }
+
+  /**
+   * Put data into a znode in Zookeeper.
+   * @param znode Path of the znode.
+   * @param data Data to write.
+   * @throws YarnException If it cannot contact ZooKeeper.
+   */
+  private void put(String znode, byte[] data, boolean update)
+      throws YarnException {
+    // Create the znode
+    boolean created = false;
+    try {
+      created = zkManager.create(znode);
+    } catch (Exception e) {
+      String errMsg = "Cannot create znode " + znode + ": " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+    if (!created) {
+      LOG.debug("{} not created", znode);
+      if (!update) {
+        LOG.info("{} already existed and we are not updating", znode);
+        return;
+      }
+    }
+
+    // Write the data into the znode
+    try {
+      zkManager.setData(znode, data, -1);
+    } catch (Exception e) {
+      String errMsg = "Cannot write data into znode " + znode
+          + ": " + e.getMessage();
+      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+    }
+  }
+
+  /**
+   * Get the current time.
+   * @return Current time in milliseconds.
+   */
+  private static long getCurrentTime() {
+    Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+    return cal.getTimeInMillis();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
new file mode 100644
index 0000000..390b803
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestZookeeperFederationStateStore.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.impl;
+
+import java.io.IOException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Unit tests for ZookeeperFederationStateStore.
+ */
+public class TestZookeeperFederationStateStore
+    extends FederationStateStoreBaseTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestZookeeperFederationStateStore.class);
+
+  /** Zookeeper test server. */
+  private static TestingServer curatorTestingServer;
+  private static CuratorFramework curatorFramework;
+
+  @Before
+  public void before() throws IOException, YarnException {
+    try {
+      curatorTestingServer = new TestingServer();
+      curatorTestingServer.start();
+      String connectString = curatorTestingServer.getConnectString();
+      curatorFramework = CuratorFrameworkFactory.builder()
+          .connectString(connectString)
+          .retryPolicy(new RetryNTimes(100, 100))
+          .build();
+      curatorFramework.start();
+
+      Configuration conf = new YarnConfiguration();
+      conf.set(CommonConfigurationKeys.ZK_ADDRESS, connectString);
+      setConf(conf);
+    } catch (Exception e) {
+      LOG.error("Cannot initialize ZooKeeper store", e);
+      throw new IOException(e);
+    }
+
+    super.before();
+  }
+
+  @After
+  public void after() throws Exception {
+    super.after();
+
+    curatorFramework.close();
+    try {
+      curatorTestingServer.stop();
+    } catch (IOException e) {
+    }
+  }
+
+  @Override
+  protected FederationStateStore createStateStore() {
+    Configuration conf = new Configuration();
+    super.setConf(conf);
+    return new ZookeeperFederationStateStore();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
index ea43268..868e771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/TestFederationStateStoreFacadeRetry.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreInvalidInputException;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.zaxxer.hikari.pool.HikariPool.PoolInitializationException;
@@ -40,14 +41,18 @@ public class TestFederationStateStoreFacadeRetry {
   private int maxRetries = 4;
   private Configuration conf;
 
+  @Before
+  public void setup() {
+    conf = new Configuration();
+    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
+  }
+
   /*
    * Test to validate that FederationStateStoreRetriableException is a retriable
    * exception.
    */
   @Test
   public void testFacadeRetriableException() throws Exception {
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action = policy.shouldRetry(
         new FederationStateStoreRetriableException(""), 0, 0, false);
@@ -66,9 +71,6 @@ public class TestFederationStateStoreFacadeRetry {
    */
   @Test
   public void testFacadeYarnException() throws Exception {
-
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action = policy.shouldRetry(new YarnException(), 0, 0, false);
     Assert.assertEquals(RetryAction.FAIL.action, action.action);
@@ -80,8 +82,6 @@ public class TestFederationStateStoreFacadeRetry {
    */
   @Test
   public void testFacadeStateStoreException() throws Exception {
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action = policy
         .shouldRetry(new FederationStateStoreException("Error"), 0, 0, false);
@@ -94,8 +94,6 @@ public class TestFederationStateStoreFacadeRetry {
    */
   @Test
   public void testFacadeInvalidInputException() throws Exception {
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action = policy.shouldRetry(
         new FederationStateStoreInvalidInputException(""), 0, 0, false);
@@ -107,8 +105,6 @@ public class TestFederationStateStoreFacadeRetry {
    */
   @Test
   public void testFacadeCacheRetriableException() throws Exception {
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action =
         policy.shouldRetry(new CacheLoaderException(""), 0, 0, false);
@@ -128,8 +124,6 @@ public class TestFederationStateStoreFacadeRetry {
   @Test
   public void testFacadePoolInitRetriableException() throws Exception {
     // PoolInitializationException is a retriable exception
-    conf = new Configuration();
-    conf.setInt(YarnConfiguration.CLIENT_FAILOVER_RETRIES, maxRetries);
     RetryPolicy policy = FederationStateStoreFacade.createRetryPolicy(conf);
     RetryAction action = policy.shouldRetry(
         new PoolInitializationException(new YarnException()), 0, 0, false);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/261f769d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index 3e3580c..8a6c137 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -129,6 +129,7 @@ AMRMProxy, Global Policy Generator (GPG) and Router work together to make this h
 
 
 The figure shows a sequence diagram for the following job execution flow:
+
 1. The Router receives an application submission request that is complaint to the YARN Application Client Protocol.
 2. The router interrogates a routing table / policy to choose the “home RM” for the job (the policy configuration is received from the state-store on heartbeat).
 3. The router queries the membership state to determine the endpoint of the home RM.
@@ -160,15 +161,50 @@ These are common configurations that should appear in the **conf/yarn-site.xml**
 | Property | Example | Description |
 |:---- |:---- |
 |`yarn.federation.enabled` | `true` | Whether federation is enabled or not |
+|`yarn.resourcemanager.cluster-id` | `<unique-subcluster-id>` | The unique subcluster identifier for this RM (same as the one used for HA). |
+
+####State-Store:
+
+Currently, we support ZooKeeper and SQL based implementations of the state-store.
+
+**Note:** The State-Store implementation must always be overwritten with one of the below.
+
+ZooKeeper: one must set the ZooKeeper settings for Hadoop:
+
+| Property | Example | Description |
+|:---- |:---- |
+|`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.ZookeeperFederationStateStore` | The type of state-store to use. |
+|`hadoop.zk.address` | `host:port` | The address for the ZooKeeper ensemble. |
+
+SQL: one must setup the following parameters:
+
+| Property | Example | Description |
+|:---- |:---- |
 |`yarn.federation.state-store.class` | `org.apache.hadoop.yarn.server.federation.store.impl.SQLFederationStateStore` | The type of state-store to use. |
 |`yarn.federation.state-store.sql.url` | `jdbc:mysql://<host>:<port>/FederationStateStore` | For SQLFederationStateStore the name of the DB where the state is stored. |
 |`yarn.federation.state-store.sql.jdbc-class` | `com.mysql.jdbc.jdbc2.optional.MysqlDataSource` | For SQLFederationStateStore the jdbc class to use. |
 |`yarn.federation.state-store.sql.username` | `<dbuser>` | For SQLFederationStateStore the username for the DB connection. |
 |`yarn.federation.state-store.sql.password` | `<dbpass>` | For SQLFederationStateStore the password for the DB connection. |
-|`yarn.resourcemanager.cluster-id` | `<unique-subcluster-id>` | The unique subcluster identifier for this RM (same as the one used for HA). |
 
+We provide scripts for MySQL and Microsoft SQL Server.
 
-Optional:
+For MySQL, one must download the latest jar version 5.x from [MVN Repository](https://mvnrepository.com/artifact/mysql/mysql-connector-java) and add it to the CLASSPATH.
+Then the DB schema is created by executing the following SQL scripts in the database:
+
+1. **sbin/FederationStateStore/MySQL/FederationStateStoreDatabase.sql**.
+2. **sbin/FederationStateStore/MySQL/FederationStateStoreUser.sql**.
+3. **sbin/FederationStateStore/MySQL/FederationStateStoreTables.sql**.
+4. **sbin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql**.
+
+In the same directory we provide scripts to drop the Stored Procedures, the Tables, the User and the Database.
+
+**Note:** the FederationStateStoreUser.sql defines a default user/password for the DB that you are **highly encouraged** to set this to a proper strong password.
+
+For SQL-Server, the process is similar, but the jdbc driver is already included.
+SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
+
+
+####Optional:
 
 | Property | Example | Description |
 |:---- |:---- |
@@ -236,22 +272,6 @@ Optional:
 |`yarn.federation.statestore.max-connections` | `1` | The maximum number of parallel connections from each AMRMProxy to the state-store. This value is typically lower than the router one, since we have many AMRMProxy that could burn-through many DB connections quickly. |
 |`yarn.federation.cache-ttl.secs` | `300` | The time to leave for the AMRMProxy cache. Typically larger than at the router, as the number of AMRMProxy is large, and we want to limit the load to the centralized state-store. |
 
-###State-Store:
-
-Currently, we support only SQL based implementation of state-store (ZooKeeper is in the works), i.e. either MySQL or Microsoft SQL Server.
-
-For MySQL, one must download the latest jar version 5.x from [MVN Repository](https://mvnrepository.com/artifact/mysql/mysql-connector-java) and add it to the CLASSPATH.
-Then the DB schema is created by executing the following SQL scripts in the database:
-1. **sbin/FederationStateStore/MySQL/FederationStateStoreDatabase.sql**.
-2. **sbin/FederationStateStore/MySQL/FederationStateStoreUser.sql**.
-3. **sbin/FederationStateStore/MySQL/FederationStateStoreTables.sql**.
-4. **sbin/FederationStateStore/MySQL/FederationStateStoreStoredProcs.sql**.
-In the same directory we provide scripts to drop the Stored Procedures, the Tables, the User and the Database.
-**Note:** the FederationStateStoreUser.sql defines a default user/password for the DB that you are **highly encouraged** to set this to a proper strong password.
-
-For SQL-Server, the process is similar, but the jdbc driver is already included in the pom (license allows it).
-SQL-Server scripts are located in **sbin/FederationStateStore/SQLServer/**.
-
 Running a Sample Job
 --------------------
 In order to submit jobs to a Federation cluster one must create a seperate set of configs for the client from which jobs will be submitted. In these, the **conf/yarn-site.xml** should have the following additional configurations:


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[6/8] hadoop git commit: YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)

Posted by cu...@apache.org.
YARN-6923. Metrics for Federation Router. (Giovanni Matteo Fumarola via asuresh)

(cherry picked from commit ae8fb13b312b30de50d65b5450b565d50d690e9e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2aacb9d3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2aacb9d3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2aacb9d3

Branch: refs/heads/branch-2
Commit: 2aacb9d3fbf21308daff828639be10acbcd5e5cc
Parents: ac090b3
Author: Arun Suresh <as...@apache.org>
Authored: Mon Aug 21 22:50:24 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:37 2017 -0700

----------------------------------------------------------------------
 .../yarn/server/router/RouterMetrics.java       | 203 +++++++++++++++
 .../clientrm/FederationClientInterceptor.java   |  37 ++-
 .../webapp/FederationInterceptorREST.java       | 116 +++++++--
 .../yarn/server/router/TestRouterMetrics.java   | 248 +++++++++++++++++++
 .../webapp/TestFederationInterceptorREST.java   |  12 +-
 5 files changed, 593 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
new file mode 100644
index 0000000..42361a3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterMetrics.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsInfo;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.*;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.metrics2.lib.Interns.info;
+
+/**
+ * This class is for maintaining the various Router Federation Interceptor
+ * activity statistics and publishing them through the metrics interfaces.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Metrics for Router Federation Interceptor", context = "fedr")
+public final class RouterMetrics {
+
+  private static final MetricsInfo RECORD_INFO =
+      info("RouterMetrics", "Router Federation Interceptor");
+  private static AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+  // Metrics for operation failed
+  @Metric("# of applications failed to be submitted")
+  private MutableGaugeInt numAppsFailedSubmitted;
+  @Metric("# of applications failed to be created")
+  private MutableGaugeInt numAppsFailedCreated;
+  @Metric("# of applications failed to be killed")
+  private MutableGaugeInt numAppsFailedKilled;
+  @Metric("# of application reports failed to be retrieved")
+  private MutableGaugeInt numAppsFailedRetrieved;
+
+  // Aggregate metrics are shared, and don't have to be looked up per call
+  @Metric("Total number of successful Submitted apps and latency(ms)")
+  private MutableRate totalSucceededAppsSubmitted;
+  @Metric("Total number of successful Killed apps and latency(ms)")
+  private MutableRate totalSucceededAppsKilled;
+  @Metric("Total number of successful Created apps and latency(ms)")
+  private MutableRate totalSucceededAppsCreated;
+  @Metric("Total number of successful Retrieved app reports and latency(ms)")
+  private MutableRate totalSucceededAppsRetrieved;
+
+  /**
+   * Provide quantile counters for all latencies.
+   */
+  private MutableQuantiles submitApplicationLatency;
+  private MutableQuantiles getNewApplicationLatency;
+  private MutableQuantiles killApplicationLatency;
+  private MutableQuantiles getApplicationReportLatency;
+
+  private static volatile RouterMetrics INSTANCE = null;
+  private static MetricsRegistry registry;
+
+  private RouterMetrics() {
+    registry = new MetricsRegistry(RECORD_INFO);
+    registry.tag(RECORD_INFO, "Router");
+    getNewApplicationLatency = registry.newQuantiles("getNewApplicationLatency",
+        "latency of get new application", "ops", "latency", 10);
+    submitApplicationLatency = registry.newQuantiles("submitApplicationLatency",
+        "latency of submit application", "ops", "latency", 10);
+    killApplicationLatency = registry.newQuantiles("killApplicationLatency",
+        "latency of kill application", "ops", "latency", 10);
+    getApplicationReportLatency =
+        registry.newQuantiles("getApplicationReportLatency",
+            "latency of get application report", "ops", "latency", 10);
+  }
+
+  public static RouterMetrics getMetrics() {
+    if (!isInitialized.get()) {
+      synchronized (RouterMetrics.class) {
+        if (INSTANCE == null) {
+          INSTANCE = DefaultMetricsSystem.instance().register("RouterMetrics",
+              "Metrics for the Yarn Router", new RouterMetrics());
+          isInitialized.set(true);
+        }
+      }
+    }
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  synchronized static void destroy() {
+    isInitialized.set(false);
+    INSTANCE = null;
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsCreated() {
+    return totalSucceededAppsCreated.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsSubmitted() {
+    return totalSucceededAppsSubmitted.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsKilled() {
+    return totalSucceededAppsKilled.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public long getNumSucceededAppsRetrieved() {
+    return totalSucceededAppsRetrieved.lastStat().numSamples();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsCreated() {
+    return totalSucceededAppsCreated.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsSubmitted() {
+    return totalSucceededAppsSubmitted.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededAppsKilled() {
+    return totalSucceededAppsKilled.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public double getLatencySucceededGetAppReport() {
+    return totalSucceededAppsRetrieved.lastStat().mean();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedCreated() {
+    return numAppsFailedCreated.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedSubmitted() {
+    return numAppsFailedSubmitted.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedKilled() {
+    return numAppsFailedKilled.value();
+  }
+
+  @VisibleForTesting
+  public int getAppsFailedRetrieved() {
+    return numAppsFailedRetrieved.value();
+  }
+
+  public void succeededAppsCreated(long duration) {
+    totalSucceededAppsCreated.add(duration);
+    getNewApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsSubmitted(long duration) {
+    totalSucceededAppsSubmitted.add(duration);
+    submitApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsKilled(long duration) {
+    totalSucceededAppsKilled.add(duration);
+    killApplicationLatency.add(duration);
+  }
+
+  public void succeededAppsRetrieved(long duration) {
+    totalSucceededAppsRetrieved.add(duration);
+    getApplicationReportLatency.add(duration);
+  }
+
+  public void incrAppsFailedCreated() {
+    numAppsFailedCreated.incr();
+  }
+
+  public void incrAppsFailedSubmitted() {
+    numAppsFailedSubmitted.incr();
+  }
+
+  public void incrAppsFailedKilled() {
+    numAppsFailedKilled.incr();
+  }
+
+  public void incrAppsFailedRetrieved() {
+    numAppsFailedRetrieved.incr();
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 7268ebd..3a36eec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -98,7 +98,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSub
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -130,6 +133,8 @@ public class FederationClientInterceptor
   private FederationStateStoreFacade federationFacade;
   private Random rand;
   private RouterPolicyFacade policyFacade;
+  private RouterMetrics routerMetrics;
+  private final Clock clock = new MonotonicClock();
 
   @Override
   public void init(String userName) {
@@ -153,7 +158,7 @@ public class FederationClientInterceptor
 
     clientRMProxies =
         new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
-
+    routerMetrics = RouterMetrics.getMetrics();
   }
 
   @Override
@@ -220,6 +225,9 @@ public class FederationClientInterceptor
   @Override
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request) throws YarnException, IOException {
+
+    long startTime = clock.getTime();
+
     Map<SubClusterId, SubClusterInfo> subClustersActive =
         federationFacade.getSubClusters(true);
 
@@ -238,6 +246,9 @@ public class FederationClientInterceptor
       }
 
       if (response != null) {
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -247,6 +258,7 @@ public class FederationClientInterceptor
 
     }
 
+    routerMetrics.incrAppsFailedCreated();
     String errMsg = "Fail to create a new application.";
     LOG.error(errMsg);
     throw new YarnException(errMsg);
@@ -320,9 +332,13 @@ public class FederationClientInterceptor
   @Override
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) throws YarnException, IOException {
+
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationSubmissionContext() == null
         || request.getApplicationSubmissionContext()
             .getApplicationId() == null) {
+      routerMetrics.incrAppsFailedSubmitted();
       RouterServerUtil
           .logAndThrowException("Missing submitApplication request or "
               + "applicationSubmissionContex information.", null);
@@ -350,6 +366,7 @@ public class FederationClientInterceptor
           subClusterId =
               federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
+          routerMetrics.incrAppsFailedSubmitted();
           String message = "Unable to insert the ApplicationId " + applicationId
               + " into the FederationStateStore";
           RouterServerUtil.logAndThrowException(message, e);
@@ -368,6 +385,7 @@ public class FederationClientInterceptor
             LOG.info("Application " + applicationId
                 + " already submitted on SubCluster " + subClusterId);
           } else {
+            routerMetrics.incrAppsFailedSubmitted();
             RouterServerUtil.logAndThrowException(message, e);
           }
         }
@@ -388,6 +406,8 @@ public class FederationClientInterceptor
         LOG.info("Application "
             + request.getApplicationSubmissionContext().getApplicationName()
             + " with appId " + applicationId + " submitted on " + subClusterId);
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -396,6 +416,7 @@ public class FederationClientInterceptor
       }
     }
 
+    routerMetrics.incrAppsFailedSubmitted();
     String errMsg = "Application "
         + request.getApplicationSubmissionContext().getApplicationName()
         + " with appId " + applicationId + " failed to be submitted.";
@@ -423,7 +444,10 @@ public class FederationClientInterceptor
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) throws YarnException, IOException {
 
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedKilled();
       RouterServerUtil.logAndThrowException(
           "Missing forceKillApplication request or ApplicationId.", null);
     }
@@ -434,6 +458,7 @@ public class FederationClientInterceptor
       subClusterId = federationFacade
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedKilled();
       RouterServerUtil.logAndThrowException("Application " + applicationId
           + " does not exist in FederationStateStore", e);
     }
@@ -447,6 +472,7 @@ public class FederationClientInterceptor
           + subClusterId);
       response = clientRMProxy.forceKillApplication(request);
     } catch (Exception e) {
+      routerMetrics.incrAppsFailedKilled();
       LOG.error("Unable to kill the application report for "
           + request.getApplicationId() + "to SubCluster "
           + subClusterId.getId(), e);
@@ -458,6 +484,8 @@ public class FederationClientInterceptor
           + applicationId + " to SubCluster " + subClusterId.getId());
     }
 
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsKilled(stopTime - startTime);
     return response;
   }
 
@@ -481,7 +509,10 @@ public class FederationClientInterceptor
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
 
+    long startTime = clock.getTime();
+
     if (request == null || request.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedRetrieved();
       RouterServerUtil.logAndThrowException(
           "Missing getApplicationReport request or applicationId information.",
           null);
@@ -493,6 +524,7 @@ public class FederationClientInterceptor
       subClusterId = federationFacade
           .getApplicationHomeSubCluster(request.getApplicationId());
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       RouterServerUtil
           .logAndThrowException("Application " + request.getApplicationId()
               + " does not exist in FederationStateStore", e);
@@ -505,6 +537,7 @@ public class FederationClientInterceptor
     try {
       response = clientRMProxy.getApplicationReport(request);
     } catch (Exception e) {
+      routerMetrics.incrAppsFailedRetrieved();
       LOG.error("Unable to get the application report for "
           + request.getApplicationId() + "to SubCluster "
           + subClusterId.getId(), e);
@@ -517,6 +550,8 @@ public class FederationClientInterceptor
           + subClusterId.getId());
     }
 
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 8ecc19d..4c7d4b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -18,7 +18,19 @@
 
 package org.apache.hadoop.yarn.server.router.webapp;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.authorize.AuthorizationException;
@@ -36,20 +48,42 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
-import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppPriority;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppTimeoutsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationStatisticsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ClusterMetricsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.DelegationToken;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.LabelsToNodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsEntryList;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodeToLabelsInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NodesInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationDeleteRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationSubmissionRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ReservationUpdateRequestInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.SchedulerTypeInfo;
+import org.apache.hadoop.yarn.server.router.RouterMetrics;
 import org.apache.hadoop.yarn.server.router.RouterServerUtil;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.IOException;
-import java.util.*;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
@@ -66,6 +100,8 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   private FederationStateStoreFacade federationFacade;
   private Random rand;
   private RouterPolicyFacade policyFacade;
+  private RouterMetrics routerMetrics;
+  private final Clock clock = new MonotonicClock();
 
   private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
 
@@ -88,6 +124,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
             YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
 
     interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+    routerMetrics = RouterMetrics.getMetrics();
   }
 
   private SubClusterId getRandomActiveSubCluster(
@@ -191,10 +228,14 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   @Override
   public Response createNewApplication(HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
+
+    long startTime = clock.getTime();
+
     Map<SubClusterId, SubClusterInfo> subClustersActive;
     try {
       subClustersActive = federationFacade.getSubClusters(true);
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedCreated();
       return Response.status(Status.INTERNAL_SERVER_ERROR)
           .entity(e.getLocalizedMessage()).build();
     }
@@ -207,6 +248,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       try {
         subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedCreated();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -226,6 +268,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       }
 
       if (response != null && response.getStatus() == 200) {
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
+
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -236,6 +282,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     String errMsg = "Fail to create a new application.";
     LOG.error(errMsg);
+    routerMetrics.incrAppsFailedCreated();
     return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
   }
 
@@ -308,7 +355,11 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   public Response submitApplication(ApplicationSubmissionContextInfo newApp,
       HttpServletRequest hsr)
       throws AuthorizationException, IOException, InterruptedException {
+
+    long startTime = clock.getTime();
+
     if (newApp == null || newApp.getApplicationId() == null) {
+      routerMetrics.incrAppsFailedSubmitted();
       String errMsg = "Missing ApplicationSubmissionContextInfo or "
           + "applicationSubmissionContex information.";
       return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
@@ -318,6 +369,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     try {
       applicationId = ApplicationId.fromString(newApp.getApplicationId());
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedSubmitted();
       return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
           .build();
     }
@@ -333,6 +385,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       try {
         subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedSubmitted();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -349,6 +402,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
           subClusterId =
               federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
         } catch (YarnException e) {
+          routerMetrics.incrAppsFailedSubmitted();
           String errMsg = "Unable to insert the ApplicationId " + applicationId
               + " into the FederationStateStore";
           return Response.status(Status.SERVICE_UNAVAILABLE)
@@ -367,6 +421,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
             subClusterIdInStateStore =
                 federationFacade.getApplicationHomeSubCluster(applicationId);
           } catch (YarnException e1) {
+            routerMetrics.incrAppsFailedSubmitted();
             return Response.status(Status.SERVICE_UNAVAILABLE)
                 .entity(e1.getLocalizedMessage()).build();
           }
@@ -374,6 +429,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
             LOG.info("Application " + applicationId
                 + " already submitted on SubCluster " + subClusterId);
           } else {
+            routerMetrics.incrAppsFailedSubmitted();
             return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
                 .build();
           }
@@ -384,6 +440,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       try {
         subClusterInfo = federationFacade.getSubCluster(subClusterId);
       } catch (YarnException e) {
+        routerMetrics.incrAppsFailedSubmitted();
         return Response.status(Status.SERVICE_UNAVAILABLE)
             .entity(e.getLocalizedMessage()).build();
       }
@@ -401,6 +458,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       if (response != null && response.getStatus() == 202) {
         LOG.info("Application " + context.getApplicationName() + " with appId "
             + applicationId + " submitted on " + subClusterId);
+
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+
         return response;
       } else {
         // Empty response from the ResourceManager.
@@ -409,6 +470,7 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       }
     }
 
+    routerMetrics.incrAppsFailedSubmitted();
     String errMsg = "Application " + newApp.getApplicationName()
         + " with appId " + applicationId + " failed to be submitted.";
     LOG.error(errMsg);
@@ -435,10 +497,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
   public AppInfo getApp(HttpServletRequest hsr, String appId,
       Set<String> unselectedFields) {
 
+    long startTime = clock.getTime();
+
     ApplicationId applicationId = null;
     try {
       applicationId = ApplicationId.fromString(appId);
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       return null;
     }
 
@@ -448,16 +513,23 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       subClusterId =
           federationFacade.getApplicationHomeSubCluster(applicationId);
       if (subClusterId == null) {
+        routerMetrics.incrAppsFailedRetrieved();
         return null;
       }
       subClusterInfo = federationFacade.getSubCluster(subClusterId);
     } catch (YarnException e) {
+      routerMetrics.incrAppsFailedRetrieved();
       return null;
     }
 
-    return getOrCreateInterceptorForSubCluster(subClusterId,
+    AppInfo response = getOrCreateInterceptorForSubCluster(subClusterId,
         subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
             unselectedFields);
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+    return response;
   }
 
   /**
@@ -481,23 +553,37 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
       String appId) throws AuthorizationException, YarnException,
       InterruptedException, IOException {
 
+    long startTime = clock.getTime();
+
     ApplicationId applicationId = null;
     try {
       applicationId = ApplicationId.fromString(appId);
     } catch (IllegalArgumentException e) {
+      routerMetrics.incrAppsFailedKilled();
       return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
           .build();
     }
 
-    SubClusterId subClusterId =
-        federationFacade.getApplicationHomeSubCluster(applicationId);
-
-    SubClusterInfo subClusterInfo =
-        federationFacade.getSubCluster(subClusterId);
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId =
+          federationFacade.getApplicationHomeSubCluster(applicationId);
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      routerMetrics.incrAppsFailedKilled();
+      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
 
-    return getOrCreateInterceptorForSubCluster(subClusterId,
+    Response response = getOrCreateInterceptorForSubCluster(subClusterId,
         subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
             hsr, appId);
+
+    long stopTime = clock.getTime();
+    routerMetrics.succeededAppsRetrieved(stopTime - startTime);
+
+    return response;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
new file mode 100644
index 0000000..3cdafd8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/TestRouterMetrics.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.router;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class validates the correctness of Router Federation Interceptor
+ * Metrics.
+ */
+public class TestRouterMetrics {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestRouterMetrics.class);
+
+  // All the operations in the bad subcluster failed.
+  private MockBadSubCluster badSubCluster = new MockBadSubCluster();
+  // All the operations in the bad subcluster succeed.
+  private MockGoodSubCluster goodSubCluster = new MockGoodSubCluster();
+
+  private static RouterMetrics metrics = RouterMetrics.getMetrics();
+
+  @BeforeClass
+  public static void init() {
+
+    LOG.info("Test: aggregate metrics are initialized correctly");
+
+    Assert.assertEquals(0, metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(0, metrics.getNumSucceededAppsRetrieved());
+
+    Assert.assertEquals(0, metrics.getAppsFailedCreated());
+    Assert.assertEquals(0, metrics.getAppsFailedSubmitted());
+    Assert.assertEquals(0, metrics.getAppsFailedKilled());
+    Assert.assertEquals(0, metrics.getAppsFailedRetrieved());
+
+    LOG.info("Test: aggregate metrics are updated correctly");
+  }
+
+  /**
+   * This test validates the correctness of the metric: Created Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsCreated() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsCreated();
+
+    goodSubCluster.getNewApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsCreated(), 0);
+
+    goodSubCluster.getNewApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsCreated());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsCreated(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to create Apps.
+   */
+  @Test
+  public void testAppsFailedCreated() {
+
+    long totalBadbefore = metrics.getAppsFailedCreated();
+
+    badSubCluster.getNewApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedCreated());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Submitted Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsSubmitted() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsSubmitted();
+
+    goodSubCluster.submitApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsSubmitted(), 0);
+
+    goodSubCluster.submitApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsSubmitted());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsSubmitted(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to submit Apps.
+   */
+  @Test
+  public void testAppsFailedSubmitted() {
+
+    long totalBadbefore = metrics.getAppsFailedSubmitted();
+
+    badSubCluster.submitApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedSubmitted());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Killed Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsKilled() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsKilled();
+
+    goodSubCluster.forceKillApplication(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(100, metrics.getLatencySucceededAppsKilled(), 0);
+
+    goodSubCluster.forceKillApplication(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsKilled());
+    Assert.assertEquals(150, metrics.getLatencySucceededAppsKilled(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to kill Apps.
+   */
+  @Test
+  public void testAppsFailedKilled() {
+
+    long totalBadbefore = metrics.getAppsFailedKilled();
+
+    badSubCluster.forceKillApplication();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedKilled());
+  }
+
+  /**
+   * This test validates the correctness of the metric: Retrieved Apps
+   * successfully.
+   */
+  @Test
+  public void testSucceededAppsReport() {
+
+    long totalGoodBefore = metrics.getNumSucceededAppsRetrieved();
+
+    goodSubCluster.getApplicationReport(100);
+
+    Assert.assertEquals(totalGoodBefore + 1,
+        metrics.getNumSucceededAppsRetrieved());
+    Assert.assertEquals(100, metrics.getLatencySucceededGetAppReport(), 0);
+
+    goodSubCluster.getApplicationReport(200);
+
+    Assert.assertEquals(totalGoodBefore + 2,
+        metrics.getNumSucceededAppsRetrieved());
+    Assert.assertEquals(150, metrics.getLatencySucceededGetAppReport(), 0);
+  }
+
+  /**
+   * This test validates the correctness of the metric: Failed to retrieve Apps.
+   */
+  @Test
+  public void testAppsReportFailed() {
+
+    long totalBadbefore = metrics.getAppsFailedRetrieved();
+
+    badSubCluster.getApplicationReport();
+
+    Assert.assertEquals(totalBadbefore + 1, metrics.getAppsFailedRetrieved());
+  }
+
+  // Records failures for all calls
+  private class MockBadSubCluster {
+    public void getNewApplication() {
+      LOG.info("Mocked: failed getNewApplication call");
+      metrics.incrAppsFailedCreated();
+    }
+
+    public void submitApplication() {
+      LOG.info("Mocked: failed submitApplication call");
+      metrics.incrAppsFailedSubmitted();
+    }
+
+    public void forceKillApplication() {
+      LOG.info("Mocked: failed forceKillApplication call");
+      metrics.incrAppsFailedKilled();
+    }
+
+    public void getApplicationReport() {
+      LOG.info("Mocked: failed getApplicationReport call");
+      metrics.incrAppsFailedRetrieved();
+    }
+  }
+
+  // Records successes for all calls
+  private class MockGoodSubCluster {
+    public void getNewApplication(long duration) {
+      LOG.info("Mocked: successful getNewApplication call with duration {}",
+          duration);
+      metrics.succeededAppsCreated(duration);
+    }
+
+    public void submitApplication(long duration) {
+      LOG.info("Mocked: successful submitApplication call with duration {}",
+          duration);
+      metrics.succeededAppsSubmitted(duration);
+    }
+
+    public void forceKillApplication(long duration) {
+      LOG.info("Mocked: successful forceKillApplication call with duration {}",
+          duration);
+      metrics.succeededAppsKilled(duration);
+    }
+
+    public void getApplicationReport(long duration) {
+      LOG.info("Mocked: successful getApplicationReport call with duration {}",
+          duration);
+      metrics.succeededAppsRetrieved(duration);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2aacb9d3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
index d918149..fb6cdd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -276,13 +276,11 @@ public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
     ApplicationId appId =
         ApplicationId.newInstance(System.currentTimeMillis(), 1);
     AppState appState = new AppState("KILLED");
-    try {
-      interceptor.updateAppState(appState, null, appId.toString());
-      Assert.fail();
-    } catch (YarnException e) {
-      Assert.assertTrue(
-          e.getMessage().equals("Application " + appId + " does not exist"));
-    }
+
+    Response response =
+        interceptor.updateAppState(appState, null, appId.toString());
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[8/8] hadoop git commit: [YARN FEDERATION BACKPORT] Fixing more Java 1.7 compilation issues

Posted by cu...@apache.org.
[YARN FEDERATION BACKPORT] Fixing more Java 1.7 compilation issues


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7cd9018b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7cd9018b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7cd9018b

Branch: refs/heads/branch-2
Commit: 7cd9018b136f5f6de7bb819909d74e4cab8fb19d
Parents: 88b32ed
Author: Carlo Curino <cu...@apache.org>
Authored: Thu Sep 21 18:19:36 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:19:36 2017 -0700

----------------------------------------------------------------------
 .../server/router/webapp/FederationInterceptorREST.java | 12 +++++++-----
 1 file changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7cd9018b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index 3a91e35..15caf0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -622,11 +622,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
    * operation.
    */
   @Override
-  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
-      Set<String> statesQuery, String finalStatusQuery, String userQuery,
-      String queueQuery, String count, String startedBegin, String startedEnd,
-      String finishBegin, String finishEnd, Set<String> applicationTypes,
-      Set<String> applicationTags, Set<String> unselectedFields) {
+  public AppsInfo getApps(final HttpServletRequest hsr, final String stateQuery,
+      final Set<String> statesQuery, final String finalStatusQuery,
+      final String userQuery, final String queueQuery, final String count,
+      final String startedBegin, final String startedEnd,
+      final String finishBegin, final String finishEnd,
+      final Set<String> applicationTypes, final Set<String> applicationTags,
+      final Set<String> unselectedFields) {
     AppsInfo apps = new AppsInfo();
     long startTime = clock.getTime();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/8] hadoop git commit: YARN-6896. Federation: routing REST invocations transparently to multiple RMs (part 1 - basic execution). (Contributed by Giovanni Matteo Fumarola via curino)

Posted by cu...@apache.org.
YARN-6896. Federation: routing REST invocations transparently to multiple RMs (part 1 - basic execution). (Contributed by Giovanni Matteo Fumarola via curino)

(cherry picked from commit cc59b5fb26ccf58dffcd8850fa12ec65250f127d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8220b19a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8220b19a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8220b19a

Branch: refs/heads/branch-2
Commit: 8220b19af70744b27c265d604ab3993e0c7659c5
Parents: a1ee4ad
Author: Carlo Curino <cu...@apache.org>
Authored: Fri Aug 11 15:58:01 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Thu Sep 21 18:09:07 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |  10 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   2 +
 .../webapp/DefaultRequestInterceptorREST.java   |  16 +-
 .../webapp/FederationInterceptorREST.java       | 750 +++++++++++++++++++
 .../webapp/BaseRouterWebServicesTest.java       |  37 +-
 .../MockDefaultRequestInterceptorREST.java      | 136 ++++
 .../webapp/TestFederationInterceptorREST.java   | 379 ++++++++++
 .../TestFederationInterceptorRESTRetry.java     | 274 +++++++
 .../TestableFederationInterceptorREST.java      |  54 ++
 .../src/site/markdown/Federation.md             |   2 +-
 10 files changed, 1646 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 34374cf..c34c076 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2653,6 +2653,16 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.router.webapp."
           + "DefaultRequestInterceptorREST";
 
+  /**
+   * The interceptor class used in FederationInterceptorREST to communicate with
+   * each SubCluster.
+   */
+  public static final String ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
+      ROUTER_WEBAPP_PREFIX + "default-interceptor-class";
+  public static final String DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS =
+      "org.apache.hadoop.yarn.server.router.webapp."
+          + "DefaultRequestInterceptorREST";
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 56fb578..d6c619d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -83,6 +83,8 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
         .add(YarnConfiguration.ROUTER_CLIENTRM_ADDRESS);
     configurationPropsToSkipCompare
         .add(YarnConfiguration.ROUTER_RMADMIN_ADDRESS);
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
 
     // Federation policies configs to be ignored
     configurationPropsToSkipCompare

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
index aa8e3eb..abd8ca6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/DefaultRequestInterceptorREST.java
@@ -30,6 +30,7 @@ import javax.ws.rs.core.Response;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ActivitiesInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppActivitiesInfo;
@@ -66,10 +67,23 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
  * implementation that simply forwards the client requests to the resource
  * manager.
  */
-public final class DefaultRequestInterceptorREST
+public class DefaultRequestInterceptorREST
     extends AbstractRESTRequestInterceptor {
 
   private String webAppAddress;
+  private SubClusterId subClusterId = null;
+
+  public void setWebAppAddress(String webAppAddress) {
+    this.webAppAddress = webAppAddress;
+  }
+
+  protected void setSubClusterId(SubClusterId scId) {
+    this.subClusterId = scId;
+  }
+
+  protected SubClusterId getSubClusterId() {
+    return this.subClusterId;
+  }
 
   @Override
   public void init(String user) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
new file mode 100644
index 0000000..8ecc19d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -0,0 +1,750 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebAppUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
+import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * Extends the {@code AbstractRESTRequestInterceptor} class and provides an
+ * implementation for federation of YARN RM and scaling an application across
+ * multiple YARN SubClusters. All the federation specific implementation is
+ * encapsulated in this class. This is always the last intercepter in the chain.
+ */
+public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationInterceptorREST.class);
+
+  private int numSubmitRetries;
+  private FederationStateStoreFacade federationFacade;
+  private Random rand;
+  private RouterPolicyFacade policyFacade;
+
+  private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
+
+  @Override
+  public void init(String user) {
+    federationFacade = FederationStateStoreFacade.getInstance();
+    rand = new Random(System.currentTimeMillis());
+
+    final Configuration conf = this.getConf();
+
+    try {
+      policyFacade = new RouterPolicyFacade(conf, federationFacade,
+          this.federationFacade.getSubClusterResolver(), null);
+    } catch (FederationPolicyInitializationException e) {
+      LOG.error(e.getMessage());
+    }
+
+    numSubmitRetries =
+        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+
+    interceptors = new HashMap<SubClusterId, DefaultRequestInterceptorREST>();
+  }
+
+  private SubClusterId getRandomActiveSubCluster(
+      Map<SubClusterId, SubClusterInfo> activeSubclusters,
+      List<SubClusterId> blackListSubClusters) throws YarnException {
+
+    if (activeSubclusters == null || activeSubclusters.size() < 1) {
+      RouterServerUtil.logAndThrowException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
+    }
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+    FederationPolicyUtils.validateSubClusterAvailability(list,
+        blackListSubClusters);
+
+    if (blackListSubClusters != null) {
+
+      // Remove from the active SubClusters from StateStore the blacklisted ones
+      for (SubClusterId scId : blackListSubClusters) {
+        list.remove(scId);
+      }
+    }
+
+    return list.get(rand.nextInt(list.size()));
+  }
+
+  @VisibleForTesting
+  protected DefaultRequestInterceptorREST getInterceptorForSubCluster(
+      SubClusterId subClusterId) {
+    if (interceptors.containsKey(subClusterId)) {
+      return interceptors.get(subClusterId);
+    } else {
+      LOG.error("The interceptor for SubCluster " + subClusterId
+          + " does not exist in the cache.");
+      return null;
+    }
+  }
+
+  private DefaultRequestInterceptorREST createInterceptorForSubCluster(
+      SubClusterId subClusterId, String webAppAddress) {
+
+    final Configuration conf = this.getConf();
+
+    String interceptorClassName =
+        conf.get(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+            YarnConfiguration.DEFAULT_ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS);
+    DefaultRequestInterceptorREST interceptorInstance = null;
+    try {
+      Class<?> interceptorClass = conf.getClassByName(interceptorClassName);
+      if (DefaultRequestInterceptorREST.class
+          .isAssignableFrom(interceptorClass)) {
+        interceptorInstance = (DefaultRequestInterceptorREST) ReflectionUtils
+            .newInstance(interceptorClass, conf);
+
+      } else {
+        throw new YarnRuntimeException(
+            "Class: " + interceptorClassName + " not instance of "
+                + DefaultRequestInterceptorREST.class.getCanonicalName());
+      }
+    } catch (ClassNotFoundException e) {
+      throw new YarnRuntimeException(
+          "Could not instantiate ApplicationMasterRequestInterceptor: "
+              + interceptorClassName,
+          e);
+    }
+
+    interceptorInstance.setWebAppAddress(webAppAddress);
+    interceptorInstance.setSubClusterId(subClusterId);
+    interceptors.put(subClusterId, interceptorInstance);
+    return interceptorInstance;
+  }
+
+  @VisibleForTesting
+  protected DefaultRequestInterceptorREST getOrCreateInterceptorForSubCluster(
+      SubClusterId subClusterId, String webAppAddress) {
+    DefaultRequestInterceptorREST interceptor =
+        getInterceptorForSubCluster(subClusterId);
+    if (interceptor == null) {
+      interceptor = createInterceptorForSubCluster(subClusterId, webAppAddress);
+    }
+    return interceptor;
+  }
+
+  /**
+   * Yarn Router forwards every getNewApplication requests to any RM. During
+   * this operation there will be no communication with the State Store. The
+   * Router will forward the requests to any SubCluster. The Router will retry
+   * to submit the request on #numSubmitRetries different SubClusters. The
+   * SubClusters are randomly chosen from the active ones.
+   * <p>
+   * Possible failures and behaviors:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit.
+   * <p>
+   * ResourceManager: the Router will timeout and contacts another RM.
+   * <p>
+   * StateStore: not in the execution.
+   */
+  @Override
+  public Response createNewApplication(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    Map<SubClusterId, SubClusterInfo> subClustersActive;
+    try {
+      subClustersActive = federationFacade.getSubClusters(true);
+    } catch (YarnException e) {
+      return Response.status(Status.INTERNAL_SERVER_ERROR)
+          .entity(e.getLocalizedMessage()).build();
+    }
+
+    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+
+      SubClusterId subClusterId;
+      try {
+        subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+
+      LOG.debug(
+          "getNewApplication try #" + i + " on SubCluster " + subClusterId);
+
+      DefaultRequestInterceptorREST interceptor =
+          getOrCreateInterceptorForSubCluster(subClusterId,
+              subClustersActive.get(subClusterId).getRMWebServiceAddress());
+      Response response = null;
+      try {
+        response = interceptor.createNewApplication(hsr);
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new ApplicationId in SubCluster "
+            + subClusterId.getId(), e);
+      }
+
+      if (response != null && response.getStatus() == 200) {
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        blacklist.add(subClusterId);
+      }
+    }
+
+    String errMsg = "Fail to create a new application.";
+    LOG.error(errMsg);
+    return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+  }
+
+  /**
+   * Today, in YARN there are no checks of any applicationId submitted.
+   * <p>
+   * Base scenarios:
+   * <p>
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into
+   * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
+   * State Store replies with the selected SubCluster (e.g. SC1). • The Router
+   * submits the request to the selected SubCluster.
+   * <p>
+   * In case of State Store failure:
+   * <p>
+   * The client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the
+   * State Store down the Router times out and it will retry depending on the
+   * FederationFacade settings. • The Router replies to the client with an error
+   * message.
+   * <p>
+   * If State Store fails after inserting the tuple: identical behavior as
+   * {@code RMWebServices}.
+   * <p>
+   * In case of Router failure:
+   * <p>
+   * Scenario 1 – Crash before submission to the ResourceManager
+   * <p>
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * crashes. • The Client timeouts and resubmits the application. • The Router
+   * selects one SubCluster to forward the request. • The Router inserts a tuple
+   * into State Store with the selected SubCluster (e.g. SC2) and the appId. •
+   * Because the tuple is already inserted in the State Store, it returns the
+   * previous selected SubCluster (e.g. SC1). • The Router submits the request
+   * to the selected SubCluster (e.g. SC1).
+   * <p>
+   * Scenario 2 – Crash after submission to the ResourceManager
+   * <p>
+   * • The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * submits the request to the selected SubCluster. • The Router crashes. • The
+   * Client timeouts and resubmit the application. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC2) and the appId. • The State
+   * Store replies with the selected SubCluster (e.g. SC1). • The Router submits
+   * the request to the selected SubCluster (e.g. SC1). When a client re-submits
+   * the same application to the same RM, it does not raise an exception and
+   * replies with operation successful message.
+   * <p>
+   * In case of Client failure: identical behavior as {@code RMWebServices}.
+   * <p>
+   * In case of ResourceManager failure:
+   * <p>
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * submits the request to the selected SubCluster. • The entire SubCluster is
+   * down – all the RMs in HA or the master RM is not reachable. • The Router
+   * times out. • The Router selects a new SubCluster to forward the request. •
+   * The Router update a tuple into State Store with the selected SubCluster
+   * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The
+   * Router submits the request to the selected SubCluster (e.g. SC2).
+   */
+  @Override
+  public Response submitApplication(ApplicationSubmissionContextInfo newApp,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    if (newApp == null || newApp.getApplicationId() == null) {
+      String errMsg = "Missing ApplicationSubmissionContextInfo or "
+          + "applicationSubmissionContex information.";
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
+    }
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(newApp.getApplicationId());
+    } catch (IllegalArgumentException e) {
+      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
+
+    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+
+      ApplicationSubmissionContext context =
+          RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
+
+      SubClusterId subClusterId = null;
+      try {
+        subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+      LOG.info("submitApplication appId" + applicationId + " try #" + i
+          + " on SubCluster " + subClusterId);
+
+      ApplicationHomeSubCluster appHomeSubCluster =
+          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+
+      if (i == 0) {
+        try {
+          // persist the mapping of applicationId and the subClusterId which has
+          // been selected as its home
+          subClusterId =
+              federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String errMsg = "Unable to insert the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          return Response.status(Status.SERVICE_UNAVAILABLE)
+              .entity(errMsg + " " + e.getLocalizedMessage()).build();
+        }
+      } else {
+        try {
+          // update the mapping of applicationId and the home subClusterId to
+          // the new subClusterId we have selected
+          federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String errMsg = "Unable to update the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          SubClusterId subClusterIdInStateStore;
+          try {
+            subClusterIdInStateStore =
+                federationFacade.getApplicationHomeSubCluster(applicationId);
+          } catch (YarnException e1) {
+            return Response.status(Status.SERVICE_UNAVAILABLE)
+                .entity(e1.getLocalizedMessage()).build();
+          }
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Application " + applicationId
+                + " already submitted on SubCluster " + subClusterId);
+          } else {
+            return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg)
+                .build();
+          }
+        }
+      }
+
+      SubClusterInfo subClusterInfo;
+      try {
+        subClusterInfo = federationFacade.getSubCluster(subClusterId);
+      } catch (YarnException e) {
+        return Response.status(Status.SERVICE_UNAVAILABLE)
+            .entity(e.getLocalizedMessage()).build();
+      }
+
+      Response response = null;
+      try {
+        response = getOrCreateInterceptorForSubCluster(subClusterId,
+            subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
+                hsr);
+      } catch (Exception e) {
+        LOG.warn("Unable to submit the application " + applicationId
+            + "to SubCluster " + subClusterId.getId(), e);
+      }
+
+      if (response != null && response.getStatus() == 202) {
+        LOG.info("Application " + context.getApplicationName() + " with appId "
+            + applicationId + " submitted on " + subClusterId);
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        blacklist.add(subClusterId);
+      }
+    }
+
+    String errMsg = "Application " + newApp.getApplicationName()
+        + " with appId " + applicationId + " failed to be submitted.";
+    LOG.error(errMsg);
+    return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   * <p>
+   * Possible failure:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public AppInfo getApp(HttpServletRequest hsr, String appId,
+      Set<String> unselectedFields) {
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(appId);
+    } catch (IllegalArgumentException e) {
+      return null;
+    }
+
+    SubClusterInfo subClusterInfo = null;
+    SubClusterId subClusterId = null;
+    try {
+      subClusterId =
+          federationFacade.getApplicationHomeSubCluster(applicationId);
+      if (subClusterId == null) {
+        return null;
+      }
+      subClusterInfo = federationFacade.getSubCluster(subClusterId);
+    } catch (YarnException e) {
+      return null;
+    }
+
+    return getOrCreateInterceptorForSubCluster(subClusterId,
+        subClusterInfo.getRMWebServiceAddress()).getApp(hsr, appId,
+            unselectedFields);
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   * <p>
+   * Possible failures and behaviors:
+   * <p>
+   * Client: identical behavior as {@code RMWebServices}.
+   * <p>
+   * Router: the Client will timeout and resubmit the request.
+   * <p>
+   * ResourceManager: the Router will timeout and the call will fail.
+   * <p>
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public Response updateAppState(AppState targetState, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+
+    ApplicationId applicationId = null;
+    try {
+      applicationId = ApplicationId.fromString(appId);
+    } catch (IllegalArgumentException e) {
+      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage())
+          .build();
+    }
+
+    SubClusterId subClusterId =
+        federationFacade.getApplicationHomeSubCluster(applicationId);
+
+    SubClusterInfo subClusterInfo =
+        federationFacade.getSubCluster(subClusterId);
+
+    return getOrCreateInterceptorForSubCluster(subClusterId,
+        subClusterInfo.getRMWebServiceAddress()).updateAppState(targetState,
+            hsr, appId);
+  }
+
+  @Override
+  public ClusterInfo get() {
+    return getClusterInfo();
+  }
+
+  @Override
+  public ClusterInfo getClusterInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ClusterMetricsInfo getClusterMetricsInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SchedulerTypeInfo getSchedulerInfo() {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public String dumpSchedulerLogs(String time, HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodesInfo getNodes(String states) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeInfo getNode(String nodeId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ActivitiesInfo getActivities(HttpServletRequest hsr, String nodeId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppActivitiesInfo getAppActivities(HttpServletRequest hsr,
+      String appId, String time) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ApplicationStatisticsInfo getAppStatistics(HttpServletRequest hsr,
+      Set<String> stateQueries, Set<String> typeQueries) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppsInfo getApps(HttpServletRequest hsr, String stateQuery,
+      Set<String> statesQuery, String finalStatusQuery, String userQuery,
+      String queueQuery, String count, String startedBegin, String startedEnd,
+      String finishBegin, String finishEnd, Set<String> applicationTypes,
+      Set<String> applicationTags, Set<String> unselectedFields) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppState getAppState(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeToLabelsInfo getNodeToLabels(HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public LabelsToNodesInfo getLabelsToNodes(Set<String> labels)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response replaceLabelsOnNodes(NodeToLabelsEntryList newNodeToLabels,
+      HttpServletRequest hsr) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response replaceLabelsOnNode(Set<String> newNodeLabelsName,
+      HttpServletRequest hsr, String nodeId) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeLabelsInfo getClusterNodeLabels(HttpServletRequest hsr)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response addToClusterNodeLabels(NodeLabelsInfo newNodeLabels,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response removeFromCluserNodeLabels(Set<String> oldNodeLabels,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public NodeLabelsInfo getLabelsOnNode(HttpServletRequest hsr, String nodeId)
+      throws IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppPriority getAppPriority(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateApplicationPriority(AppPriority targetPriority,
+      HttpServletRequest hsr, String appId) throws AuthorizationException,
+      YarnException, InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppQueue getAppQueue(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateAppQueue(AppQueue targetQueue, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response postDelegationToken(DelegationToken tokenData,
+      HttpServletRequest hsr) throws AuthorizationException, IOException,
+      InterruptedException, Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response postDelegationTokenExpiration(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response cancelDelegationToken(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException,
+      Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response createNewReservation(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response submitReservation(ReservationSubmissionRequestInfo resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateReservation(ReservationUpdateRequestInfo resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response deleteReservation(ReservationDeleteRequestInfo resContext,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response listReservation(String queue, String reservationId,
+      long startTime, long endTime, boolean includeResourceAllocations,
+      HttpServletRequest hsr) throws Exception {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppTimeoutInfo getAppTimeout(HttpServletRequest hsr, String appId,
+      String type) throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppTimeoutsInfo getAppTimeouts(HttpServletRequest hsr, String appId)
+      throws AuthorizationException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public Response updateApplicationTimeout(AppTimeoutInfo appTimeout,
+      HttpServletRequest hsr, String appId) throws AuthorizationException,
+      YarnException, InterruptedException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppAttemptsInfo getAppAttempts(HttpServletRequest hsr, String appId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public AppAttemptInfo getAppAttempt(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ContainersInfo getContainers(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ContainerInfo getContainer(HttpServletRequest req,
+      HttpServletResponse res, String appId, String appAttemptId,
+      String containerId) {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void setNextInterceptor(RESTRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+        + "FederationInterceptorREST, which should be the last one "
+        + "in the chain. Check if the interceptor pipeline configuration "
+        + "is correct");
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
index 223690f..7d42084 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/BaseRouterWebServicesTest.java
@@ -27,6 +27,7 @@ import java.security.PrivilegedExceptionAction;
 import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -75,30 +76,34 @@ public abstract class BaseRouterWebServicesTest {
   private RouterWebServices routerWebService;
 
   @Before
-  public void setup() {
-    conf = new YarnConfiguration();
+  public void setUp() {
+    this.conf = createConfiguration();
 
+    router = spy(new Router());
+    Mockito.doNothing().when(router).startWepApp();
+    routerWebService = new RouterWebServices(router, conf);
+    routerWebService.setResponse(mock(HttpServletResponse.class));
+
+    router.init(conf);
+    router.start();
+  }
+
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration config = new YarnConfiguration();
     String mockPassThroughInterceptorClass =
         PassThroughRESTRequestInterceptor.class.getName();
 
     // Create a request intercepter pipeline for testing. The last one in the
     // chain will call the mock resource manager. The others in the chain will
     // simply forward it to the next one in the chain
-    conf.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
+    config.set(YarnConfiguration.ROUTER_WEBAPP_INTERCEPTOR_CLASS_PIPELINE,
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
             + "," + mockPassThroughInterceptorClass + ","
             + MockRESTRequestInterceptor.class.getName());
 
-    conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+    config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
         TEST_MAX_CACHE_SIZE);
-
-    router = spy(new Router());
-    Mockito.doNothing().when(router).startWepApp();
-    routerWebService = new RouterWebServices(router, conf);
-    routerWebService.setResponse(mock(HttpServletResponse.class));
-
-    router.init(conf);
-    router.start();
+    return config;
   }
 
   @After
@@ -108,6 +113,14 @@ public abstract class BaseRouterWebServicesTest {
     }
   }
 
+  public void setUpConfig() {
+    this.conf = createConfiguration();
+  }
+
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
   protected RouterWebServices getRouterWebServices() {
     Assert.assertNotNull(this.routerWebService);
     return this.routerWebService;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
new file mode 100644
index 0000000..91e601e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/MockDefaultRequestInterceptorREST.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.*;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class mocks the RESTRequestInterceptor.
+ */
+public class MockDefaultRequestInterceptorREST
+    extends DefaultRequestInterceptorREST {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MockDefaultRequestInterceptorREST.class);
+  final private AtomicInteger applicationCounter = new AtomicInteger(0);
+  // True if the Mock RM is running, false otherwise.
+  // This property allows us to write tests for specific scenario as Yarn RM
+  // down e.g. network issue, failover.
+  private boolean isRunning = true;
+  private HashSet<ApplicationId> applicationMap = new HashSet<>();
+
+  private void validateRunning() throws ConnectException {
+    if (!isRunning) {
+      throw new ConnectException("RM is stopped");
+    }
+  }
+
+  @Override
+  public Response createNewApplication(HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    validateRunning();
+
+    ApplicationId applicationId =
+        ApplicationId.newInstance(Integer.valueOf(getSubClusterId().getId()),
+            applicationCounter.incrementAndGet());
+    NewApplication appId =
+        new NewApplication(applicationId.toString(), new ResourceInfo());
+    return Response.status(Status.OK).entity(appId).build();
+  }
+
+  @Override
+  public Response submitApplication(ApplicationSubmissionContextInfo newApp,
+      HttpServletRequest hsr)
+      throws AuthorizationException, IOException, InterruptedException {
+    validateRunning();
+
+    ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+    LOG.info("Application submitted: " + appId);
+    applicationMap.add(appId);
+    return Response.status(Status.ACCEPTED).header(HttpHeaders.LOCATION, "")
+        .entity(getSubClusterId()).build();
+  }
+
+  @Override
+  public AppInfo getApp(HttpServletRequest hsr, String appId,
+      Set<String> unselectedFields) {
+    if (!isRunning) {
+      throw new RuntimeException("RM is stopped");
+    }
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.contains(applicationId)) {
+      throw new NotFoundException("app with id: " + appId + " not found");
+    }
+
+    return new AppInfo();
+  }
+
+  @Override
+  public Response updateAppState(AppState targetState, HttpServletRequest hsr,
+      String appId) throws AuthorizationException, YarnException,
+      InterruptedException, IOException {
+    validateRunning();
+
+    ApplicationId applicationId = ApplicationId.fromString(appId);
+    if (!applicationMap.remove(applicationId)) {
+      throw new ApplicationNotFoundException(
+          "Trying to kill an absent application: " + appId);
+    }
+
+    if (targetState == null) {
+      return Response.status(Status.BAD_REQUEST).build();
+    }
+
+    LOG.info("Force killing application: " + appId);
+    AppState ret = new AppState();
+    ret.setState(targetState.toString());
+    return Response.status(Status.OK).entity(ret).build();
+  }
+
+  public void setSubClusterId(int subClusterId) {
+    setSubClusterId(SubClusterId.newInstance(Integer.toString(subClusterId)));
+  }
+
+  public boolean isRunning() {
+    return isRunning;
+  }
+
+  public void setRunning(boolean runningMode) {
+    this.isRunning = runningMode;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
new file mode 100644
index 0000000..d918149
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorREST.java
@@ -0,0 +1,379 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppState;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ */
+public class TestFederationInterceptorREST extends BaseRouterWebServicesTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationInterceptorREST.class);
+  private final static int NUM_SUBCLUSTER = 4;
+  private static final int BAD_REQUEST = 400;
+  private static final int ACCEPTED = 202;
+  private static String user = "test-user";
+  private TestableFederationInterceptorREST interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private List<SubClusterId> subClusters;
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationInterceptorREST();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        this.getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    subClusters = new ArrayList<>();
+
+    try {
+      for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        stateStoreUtil.registerSubCluster(sc);
+        subClusters.add(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+        MockDefaultRequestInterceptorREST.class.getName());
+    String mockPassThroughInterceptorClass =
+        PassThroughRESTRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + TestableFederationInterceptorREST.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication. The return
+   * ApplicationId has to belong to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testGetNewApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    Response response = interceptor.createNewApplication(null);
+
+    Assert.assertNotNull(response);
+    NewApplication ci = (NewApplication) response.getEntity();
+    Assert.assertNotNull(ci);
+    ApplicationId appId = ApplicationId.fromString(ci.getApplicationId());
+    Assert.assertTrue(appId.getClusterTimestamp() < NUM_SUBCLUSTER);
+    Assert.assertTrue(appId.getClusterTimestamp() >= 0);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication. The application
+   * has to be submitted to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testSubmitApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+    SubClusterId ci = (SubClusterId) response.getEntity();
+
+    Assert.assertNotNull(response);
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+    Assert.assertTrue(subClusters.contains(scIdResult));
+    Assert.assertEquals(ci, scIdResult);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of
+   * multiple submission. The first retry has to be submitted to the same
+   * SubCluster of the first attempt.
+   */
+  @Test
+  public void testSubmitApplicationMultipleSubmission()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // First attempt
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+
+    // First retry
+    response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+    SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult2);
+    Assert.assertEquals(scIdResult, scIdResult2);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of empty
+   * request.
+   */
+  @Test
+  public void testSubmitApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+
+    // ApplicationSubmissionContextInfo null
+    Response response = interceptor.submitApplication(null, null);
+
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+    // ApplicationSubmissionContextInfo empty
+    response = interceptor
+        .submitApplication(new ApplicationSubmissionContextInfo(), null);
+
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of of
+   * application in wrong format.
+   */
+  @Test
+  public void testSubmitApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId("Application_wrong_id");
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testForceKillApplication()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we are going to kill later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    AppState appState = new AppState("KILLED");
+
+    Response responseKill =
+        interceptor.updateAppState(appState, null, appId.toString());
+    Assert.assertNotNull(responseKill);
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testForceKillApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    AppState appState = new AppState("KILLED");
+    try {
+      interceptor.updateAppState(appState, null, appId.toString());
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().equals("Application " + appId + " does not exist"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * application in wrong format.
+   */
+  @Test
+  public void testForceKillApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    AppState appState = new AppState("KILLED");
+    Response response =
+        interceptor.updateAppState(appState, null, "Application_wrong_id");
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * empty request.
+   */
+  @Test
+  public void testForceKillApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we are going to kill later
+    interceptor.submitApplication(context, null);
+
+    Response response =
+        interceptor.updateAppState(null, null, appId.toString());
+    Assert.assertEquals(BAD_REQUEST, response.getStatus());
+
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testGetApplicationReport()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    // Submit the application we want the report later
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    AppInfo responseGet = interceptor.getApp(null, appId.toString(), null);
+
+    Assert.assertNotNull(responseGet);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testGetApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    AppInfo response = interceptor.getApp(null, appId.toString(), null);
+
+    Assert.assertNull(response);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case of
+   * application in wrong format.
+   */
+  @Test
+  public void testGetApplicationWrongFormat()
+      throws YarnException, IOException, InterruptedException {
+
+    AppInfo response = interceptor.getApp(null, "Application_wrong_id", null);
+
+    Assert.assertNull(response);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
new file mode 100644
index 0000000..48bc1a8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import javax.ws.rs.core.Response;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.ApplicationSubmissionContextInfo;
+import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.NewApplication;
+import org.apache.hadoop.yarn.server.router.clientrm.PassThroughClientRequestInterceptor;
+import org.apache.hadoop.yarn.server.router.clientrm.TestableFederationClientInterceptor;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterWebServicesTest} and overrides methods in order
+ * to use the {@code RouterWebServices} pipeline test cases for testing the
+ * {@code FederationInterceptorREST} class. The tests for
+ * {@code RouterWebServices} has been written cleverly so that it can be reused
+ * to validate different request interceptor chains.
+ * <p>
+ * It tests the case with SubClusters down and the Router logic of retries. We
+ * have 1 good SubCluster and 2 bad ones for all the tests.
+ */
+public class TestFederationInterceptorRESTRetry
+    extends BaseRouterWebServicesTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationInterceptorRESTRetry.class);
+  private static final int SERVICE_UNAVAILABLE = 503;
+  private static final int ACCEPTED = 202;
+  private static final int OK = 200;
+  // running and registered
+  private static SubClusterId good;
+  // registered but not running
+  private static SubClusterId bad1;
+  private static SubClusterId bad2;
+  private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
+  private TestableFederationInterceptorREST interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private String user = "test-user";
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationInterceptorREST();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    // Create SubClusters
+    good = SubClusterId.newInstance("0");
+    bad1 = SubClusterId.newInstance("1");
+    bad2 = SubClusterId.newInstance("2");
+    scs.add(good);
+    scs.add(bad1);
+    scs.add(bad2);
+
+    // The mock RM will not start in these SubClusters, this is done to simulate
+    // a SubCluster down
+
+    interceptor.registerBadSubCluster(bad1);
+    interceptor.registerBadSubCluster(bad2);
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  private void setupCluster(List<SubClusterId> scsToRegister)
+      throws YarnException {
+
+    try {
+      // Clean up the StateStore before every test
+      stateStoreUtil.deregisterAllSubClusters();
+
+      for (SubClusterId sc : scsToRegister) {
+        stateStoreUtil.registerSubCluster(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+
+    conf.set(YarnConfiguration.ROUTER_WEBAPP_DEFAULT_INTERCEPTOR_CLASS,
+        MockDefaultRequestInterceptorREST.class.getName());
+
+    String mockPassThroughInterceptorClass =
+        PassThroughClientRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + ","
+            + TestableFederationClientInterceptor.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetNewApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    Response response = interceptor.createNewApplication(null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetNewApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    Response response = interceptor.createNewApplication(null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster and 1 good one.
+   */
+  @Test
+  public void testGetNewApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test getNewApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+    Response response = interceptor.createNewApplication(null);
+
+    Assert.assertEquals(OK, response.getStatus());
+
+    NewApplication newApp = (NewApplication) response.getEntity();
+    ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        appId.getClusterTimestamp());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testSubmitApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    setupCluster(Arrays.asList(bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testSubmitApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+
+    Response response = interceptor.submitApplication(context, null);
+    Assert.assertEquals(SERVICE_UNAVAILABLE, response.getStatus());
+    Assert.assertEquals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE,
+        response.getEntity());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testSubmitApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test submitApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContextInfo context =
+        new ApplicationSubmissionContextInfo();
+    context.setApplicationId(appId.toString());
+    Response response = interceptor.submitApplication(context, null);
+
+    Assert.assertEquals(ACCEPTED, response.getStatus());
+
+    Assert.assertEquals(good,
+        stateStore
+            .getApplicationHomeSubCluster(
+                GetApplicationHomeSubClusterRequest.newInstance(appId))
+            .getApplicationHomeSubCluster().getHomeSubCluster());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
new file mode 100644
index 0000000..ce5bb23
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestableFederationInterceptorREST.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.webapp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * Extends the FederationInterceptorREST and overrides methods to provide a
+ * testable implementation of FederationInterceptorREST.
+ */
+public class TestableFederationInterceptorREST
+    extends FederationInterceptorREST {
+
+  private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+
+  /**
+   * For testing purpose, some subclusters has to be down to simulate particular
+   * scenarios as RM Failover, network issues. For this reason we keep track of
+   * these bad subclusters. This method make the subcluster unusable.
+   *
+   * @param badSC the subcluster to make unusable
+   */
+  protected void registerBadSubCluster(SubClusterId badSC) {
+
+    // Adding in the cache the bad SubCluster, in this way we can stop them
+    getOrCreateInterceptorForSubCluster(badSC, "test");
+
+    badSubCluster.add(badSC);
+    MockDefaultRequestInterceptorREST interceptor =
+        (MockDefaultRequestInterceptorREST) super.getInterceptorForSubCluster(
+            badSC);
+    interceptor.setRunning(false);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8220b19a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
index ecf61c5..3e3580c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/Federation.md
@@ -216,7 +216,7 @@ Optional:
 |`yarn.router.submit.retry` | `3` | The number of retries in the router before we give up. |
 |`yarn.federation.statestore.max-connections` | `10` | This is the maximum number of parallel connections each Router makes to the state-store. |
 |`yarn.federation.cache-ttl.secs` | `60` | The Router caches informations, and this is the time to leave before the cache is invalidated. |
-
+|`yarn.router.webapp.interceptor-class.pipeline` | `org.apache.hadoop.yarn.server.router.webapp.FederationInterceptorREST` | A comma-seperated list of interceptor classes to be run at the router when interfacing with the client via REST interface. The last step of this pipeline must be the Federation Interceptor REST. |
 
 ###ON NMs:
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org