You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/08/02 00:25:22 UTC

[40/50] [abbrv] hadoop git commit: YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).

YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/52daa6d9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/52daa6d9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/52daa6d9

Branch: refs/heads/YARN-2915
Commit: 52daa6d971ae408d121b3737ea8c0575e7e8516d
Parents: f8e5de5
Author: Subru Krishnan <su...@apache.org>
Authored: Mon Jun 26 13:27:26 2017 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue Aug 1 17:22:12 2017 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/conf/YarnConfiguration.java     |   8 +
 .../yarn/conf/TestYarnConfigurationFields.java  |   5 +
 .../hadoop/yarn/conf/TestYarnConfiguration.java |   1 -
 ...ionPolicyInitializationContextValidator.java |   2 +-
 .../policies/FederationPolicyUtils.java         |  35 +
 .../federation/policies/RouterPolicyFacade.java |   3 +-
 .../policies/router/HashBasedRouterPolicy.java  |   5 +
 .../policies/router/LoadBasedRouterPolicy.java  |   5 +
 .../policies/router/PriorityRouterPolicy.java   |   5 +
 .../router/UniformRandomRouterPolicy.java       |   4 +
 .../router/WeightedRandomRouterPolicy.java      |   5 +
 .../yarn/server/MockResourceManagerFacade.java  | 165 ++++-
 .../policies/router/BaseRouterPoliciesTest.java |  28 +
 .../utils/FederationStateStoreTestUtil.java     |  15 +-
 .../hadoop-yarn-server-router/pom.xml           |   5 +
 .../yarn/server/router/RouterServerUtil.java    |  63 ++
 .../AbstractClientRequestInterceptor.java       |  41 +-
 .../DefaultClientRequestInterceptor.java        |  31 +-
 .../clientrm/FederationClientInterceptor.java   | 684 +++++++++++++++++++
 .../router/clientrm/BaseRouterClientRMTest.java |  22 +-
 .../TestFederationClientInterceptor.java        | 403 +++++++++++
 .../TestFederationClientInterceptorRetry.java   | 295 ++++++++
 .../TestableFederationClientInterceptor.java    |  75 ++
 23 files changed, 1866 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 1432867..0c0dd11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2674,6 +2674,14 @@ public class YarnConfiguration extends Configuration {
       "org.apache.hadoop.yarn.server.router.rmadmin."
           + "DefaultRMAdminRequestInterceptor";
 
+  /**
+   * The number of retries for GetNewApplication and SubmitApplication in
+   * {@code FederationClientInterceptor}.
+   */
+  public static final String ROUTER_CLIENTRM_SUBMIT_RETRY =
+      ROUTER_PREFIX + "submit.retry";
+  public static final int DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY = 3;
+
   ////////////////////////////////
   // Other Configs
   ////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index c3cb78d..a439cd5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -136,6 +136,11 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.NM_CPU_RESOURCE_ENABLED);
 
+    // Ignore all Router Federation variables
+
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY);
+
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
index 7389423..a053fdb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfiguration.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 import org.junit.Test;
 
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 3c44e7e..da63bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -51,7 +51,7 @@ public final class FederationPolicyInitializationContextValidator {
 
     if (policyContext.getFederationSubclusterResolver() == null) {
       throw new FederationPolicyInitializationException(
-          "The FederationStateStoreFacase provided is null. Cannot"
+          "The FederationSubclusterResolver provided is null. Cannot"
               + " reinitalize successfully.");
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
index 37ce942..97e4848 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyUtils.java
@@ -19,12 +19,14 @@ package org.apache.hadoop.yarn.server.federation.policies;
 
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -41,6 +43,9 @@ public final class FederationPolicyUtils {
   private static final Logger LOG =
       LoggerFactory.getLogger(FederationPolicyUtils.class);
 
+  public static final String NO_ACTIVE_SUBCLUSTER_AVAILABLE =
+      "No active SubCluster available to submit the request.";
+
   /** Disable constructor. */
   private FederationPolicyUtils() {
   }
@@ -165,4 +170,34 @@ public final class FederationPolicyUtils {
     return federationPolicyManager.getAMRMPolicy(context, oldPolicy);
   }
 
+  /**
+   * Validate if there is any active subcluster that is not blacklisted, it will
+   * throw an exception if there are no usable subclusters.
+   *
+   * @param activeSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} currently active.
+   * @param blackListSubClusters the list of subClusters as identified by
+   *          {@link SubClusterId} to blackList from the selection of the home
+   *          subCluster.
+   * @throws FederationPolicyException if there are no usable subclusters.
+   */
+  public static void validateSubClusterAvailability(
+      List<SubClusterId> activeSubClusters,
+      List<SubClusterId> blackListSubClusters)
+      throws FederationPolicyException {
+    if (activeSubClusters != null && !activeSubClusters.isEmpty()) {
+      if (blackListSubClusters == null) {
+        return;
+      }
+      for (SubClusterId scId : activeSubClusters) {
+        if (!blackListSubClusters.contains(scId)) {
+          // There is at least one active subcluster
+          return;
+        }
+      }
+    }
+    throw new FederationPolicyException(
+        FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+  }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index 44c1b10..52c2905 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -56,7 +57,7 @@ public class RouterPolicyFacade {
   @VisibleForTesting
   Map<String, FederationRouterPolicy> globalPolicyMap;
 
-  public RouterPolicyFacade(YarnConfiguration conf,
+  public RouterPolicyFacade(Configuration conf,
       FederationStateStoreFacade facade, SubClusterResolver resolver,
       SubClusterId homeSubcluster)
       throws FederationPolicyInitializationException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
index 257a9fe..cc11880 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -76,6 +77,10 @@ public class HashBasedRouterPolicy extends AbstractRouterPolicy {
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
+    FederationPolicyUtils.validateSubClusterAvailability(
+        new ArrayList<SubClusterId>(activeSubclusters.keySet()),
+        blackListSubClusters);
+
     if (blackListSubClusters != null) {
 
       // Remove from the active SubClusters from StateStore the blacklisted ones

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index c124001..06e445b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -17,12 +17,14 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
@@ -72,6 +74,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
+    FederationPolicyUtils.validateSubClusterAvailability(
+        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
     Map<SubClusterIdInfo, Float> weights =
         getPolicyInfo().getRouterPolicyWeights();
     SubClusterIdInfo chosen = null;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index 59f8767..a1f7666 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -17,11 +17,13 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -44,6 +46,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
+    FederationPolicyUtils.validateSubClusterAvailability(
+        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
     // This finds the sub-cluster with the highest weight among the
     // currently active ones.
     Map<SubClusterIdInfo, Float> weights =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index bc729b7..7a8be91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -86,6 +87,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
 
     List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
 
+    FederationPolicyUtils.validateSubClusterAvailability(list,
+        blackListSubClusters);
+
     if (blackListSubClusters != null) {
 
       // Remove from the active SubClusters from StateStore the blacklisted ones

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index 7f230a7..aec7576 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.yarn.server.federation.policies.router;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
@@ -51,6 +53,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
     Map<SubClusterId, SubClusterInfo> activeSubclusters =
         getActiveSubclusters();
 
+    FederationPolicyUtils.validateSubClusterAvailability(
+        new ArrayList<SubClusterId>(activeSubclusters.keySet()), blacklist);
+
     // note: we cannot pre-compute the weights, as the set of activeSubcluster
     // changes dynamically (and this would unfairly spread the load to
     // sub-clusters adjacent to an inactive one), hence we need to count/scan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
index 65c12c6..68c55ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server;
 
 import java.io.IOException;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -154,6 +155,7 @@ import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import com.google.common.base.Strings;
 
 /**
@@ -175,6 +177,13 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       new HashMap<ContainerId, Container>();
   private AtomicInteger containerIndex = new AtomicInteger(0);
   private Configuration conf;
+  private int subClusterId;
+  final private AtomicInteger applicationCounter = new AtomicInteger(0);
+
+  // True if the Mock RM is running, false otherwise.
+  // This property allows us to write tests for specific scenario as Yarn RM
+  // down e.g. network issue, failover.
+  private boolean isRunning;
 
   private boolean shouldReRegisterNext = false;
 
@@ -187,14 +196,25 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
 
   public MockResourceManagerFacade(Configuration conf,
       int startContainerIndex) {
+    this(conf, startContainerIndex, 0, true);
+  }
+
+  public MockResourceManagerFacade(Configuration conf, int startContainerIndex,
+      int subClusterId, boolean isRunning) {
     this.conf = conf;
     this.containerIndex.set(startContainerIndex);
+    this.subClusterId = subClusterId;
+    this.isRunning = isRunning;
   }
 
   public void setShouldReRegisterNext() {
     shouldReRegisterNext = true;
   }
 
+  public void setRunningMode(boolean mode) {
+    this.isRunning = mode;
+  }
+
   private static String getAppIdentifier() throws IOException {
     AMRMTokenIdentifier result = null;
     UserGroupInformation remoteUgi = UserGroupInformation.getCurrentUser();
@@ -208,10 +228,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
     return result != null ? result.getApplicationAttemptId().toString() : "";
   }
 
+  private void validateRunning() throws ConnectException {
+    if (!isRunning) {
+      throw new ConnectException("RM is stopped");
+    }
+  }
+
   @Override
   public RegisterApplicationMasterResponse registerApplicationMaster(
       RegisterApplicationMasterRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     String amrmToken = getAppIdentifier();
     LOG.info("Registering application attempt: " + amrmToken);
 
@@ -248,6 +277,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public FinishApplicationMasterResponse finishApplicationMaster(
       FinishApplicationMasterRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     String amrmToken = getAppIdentifier();
     LOG.info("Finishing application attempt: " + amrmToken);
 
@@ -284,6 +316,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public AllocateResponse allocate(AllocateRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     if (request.getAskList() != null && request.getAskList().size() > 0
         && request.getReleaseList() != null
         && request.getReleaseList().size() > 0) {
@@ -391,6 +426,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public GetApplicationReportResponse getApplicationReport(
       GetApplicationReportRequest request) throws YarnException, IOException {
 
+    validateRunning();
+
     GetApplicationReportResponse response =
         Records.newRecord(GetApplicationReportResponse.class);
     ApplicationReport report = Records.newRecord(ApplicationReport.class);
@@ -407,6 +444,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
       GetApplicationAttemptReportRequest request)
       throws YarnException, IOException {
 
+    validateRunning();
+
     GetApplicationAttemptReportResponse response =
         Records.newRecord(GetApplicationAttemptReportResponse.class);
     ApplicationAttemptReport report =
@@ -420,12 +459,19 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public GetNewApplicationResponse getNewApplication(
       GetNewApplicationRequest request) throws YarnException, IOException {
-    return GetNewApplicationResponse.newInstance(null, null, null);
+
+    validateRunning();
+
+    return GetNewApplicationResponse.newInstance(ApplicationId.newInstance(
+        subClusterId, applicationCounter.incrementAndGet()), null, null);
   }
 
   @Override
   public SubmitApplicationResponse submitApplication(
       SubmitApplicationRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     ApplicationId appId = null;
     if (request.getApplicationSubmissionContext() != null) {
       appId = request.getApplicationSubmissionContext().getApplicationId();
@@ -438,6 +484,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public KillApplicationResponse forceKillApplication(
       KillApplicationRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     ApplicationId appId = null;
     if (request.getApplicationId() != null) {
       appId = request.getApplicationId();
@@ -453,48 +502,72 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public GetClusterMetricsResponse getClusterMetrics(
       GetClusterMetricsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetClusterMetricsResponse.newInstance(null);
   }
 
   @Override
   public GetApplicationsResponse getApplications(GetApplicationsRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return GetApplicationsResponse.newInstance(null);
   }
 
   @Override
   public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return GetClusterNodesResponse.newInstance(null);
   }
 
   @Override
   public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return GetQueueInfoResponse.newInstance(null);
   }
 
   @Override
   public GetQueueUserAclsInfoResponse getQueueUserAcls(
       GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetQueueUserAclsInfoResponse.newInstance(null);
   }
 
   @Override
   public GetDelegationTokenResponse getDelegationToken(
       GetDelegationTokenRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetDelegationTokenResponse.newInstance(null);
   }
 
   @Override
   public RenewDelegationTokenResponse renewDelegationToken(
       RenewDelegationTokenRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return RenewDelegationTokenResponse.newInstance(0);
   }
 
   @Override
   public CancelDelegationTokenResponse cancelDelegationToken(
       CancelDelegationTokenRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return CancelDelegationTokenResponse.newInstance();
   }
 
@@ -502,36 +575,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
       MoveApplicationAcrossQueuesRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return MoveApplicationAcrossQueuesResponse.newInstance();
   }
 
   @Override
   public GetApplicationAttemptsResponse getApplicationAttempts(
       GetApplicationAttemptsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetApplicationAttemptsResponse.newInstance(null);
   }
 
   @Override
   public GetContainerReportResponse getContainerReport(
       GetContainerReportRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetContainerReportResponse.newInstance(null);
   }
 
   @Override
   public GetContainersResponse getContainers(GetContainersRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return GetContainersResponse.newInstance(null);
   }
 
   @Override
   public ReservationSubmissionResponse submitReservation(
       ReservationSubmissionRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return ReservationSubmissionResponse.newInstance();
   }
 
   @Override
   public ReservationListResponse listReservations(
       ReservationListRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return ReservationListResponse
         .newInstance(new ArrayList<ReservationAllocationState>());
   }
@@ -539,18 +630,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public ReservationUpdateResponse updateReservation(
       ReservationUpdateRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return ReservationUpdateResponse.newInstance();
   }
 
   @Override
   public ReservationDeleteResponse deleteReservation(
       ReservationDeleteRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return ReservationDeleteResponse.newInstance();
   }
 
   @Override
   public GetNodesToLabelsResponse getNodeToLabels(
       GetNodesToLabelsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetNodesToLabelsResponse
         .newInstance(new HashMap<NodeId, Set<String>>());
   }
@@ -558,18 +658,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public GetClusterNodeLabelsResponse getClusterNodeLabels(
       GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetClusterNodeLabelsResponse.newInstance(new ArrayList<NodeLabel>());
   }
 
   @Override
   public GetLabelsToNodesResponse getLabelsToNodes(
       GetLabelsToNodesRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetLabelsToNodesResponse.newInstance(null);
   }
 
   @Override
   public GetNewReservationResponse getNewReservation(
       GetNewReservationRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return GetNewReservationResponse
         .newInstance(ReservationId.newInstance(0, 0));
   }
@@ -577,6 +686,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   @Override
   public FailApplicationAttemptResponse failApplicationAttempt(
       FailApplicationAttemptRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return FailApplicationAttemptResponse.newInstance();
   }
 
@@ -584,12 +696,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public UpdateApplicationPriorityResponse updateApplicationPriority(
       UpdateApplicationPriorityRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return UpdateApplicationPriorityResponse.newInstance(null);
   }
 
   @Override
   public SignalContainerResponse signalToContainer(
       SignalContainerRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return new SignalContainerResponsePBImpl();
   }
 
@@ -597,18 +715,27 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
       UpdateApplicationTimeoutsRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return UpdateApplicationTimeoutsResponse.newInstance();
   }
 
   @Override
   public RefreshQueuesResponse refreshQueues(RefreshQueuesRequest request)
       throws StandbyException, YarnException, IOException {
+
+    validateRunning();
+
     return RefreshQueuesResponse.newInstance();
   }
 
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws StandbyException, YarnException, IOException {
+
+    validateRunning();
+
     return RefreshNodesResponse.newInstance();
   }
 
@@ -616,6 +743,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
       RefreshSuperUserGroupsConfigurationRequest request)
       throws StandbyException, YarnException, IOException {
+
+    validateRunning();
+
     return RefreshSuperUserGroupsConfigurationResponse.newInstance();
   }
 
@@ -623,36 +753,54 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public RefreshUserToGroupsMappingsResponse refreshUserToGroupsMappings(
       RefreshUserToGroupsMappingsRequest request)
       throws StandbyException, YarnException, IOException {
+
+    validateRunning();
+
     return RefreshUserToGroupsMappingsResponse.newInstance();
   }
 
   @Override
   public RefreshAdminAclsResponse refreshAdminAcls(
       RefreshAdminAclsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return RefreshAdminAclsResponse.newInstance();
   }
 
   @Override
   public RefreshServiceAclsResponse refreshServiceAcls(
       RefreshServiceAclsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return RefreshServiceAclsResponse.newInstance();
   }
 
   @Override
   public UpdateNodeResourceResponse updateNodeResource(
       UpdateNodeResourceRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return UpdateNodeResourceResponse.newInstance();
   }
 
   @Override
   public RefreshNodesResourcesResponse refreshNodesResources(
       RefreshNodesResourcesRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return RefreshNodesResourcesResponse.newInstance();
   }
 
   @Override
   public AddToClusterNodeLabelsResponse addToClusterNodeLabels(
       AddToClusterNodeLabelsRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return AddToClusterNodeLabelsResponse.newInstance();
   }
 
@@ -660,12 +808,18 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public RemoveFromClusterNodeLabelsResponse removeFromClusterNodeLabels(
       RemoveFromClusterNodeLabelsRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return RemoveFromClusterNodeLabelsResponse.newInstance();
   }
 
   @Override
   public ReplaceLabelsOnNodeResponse replaceLabelsOnNode(
       ReplaceLabelsOnNodeRequest request) throws YarnException, IOException {
+
+    validateRunning();
+
     return ReplaceLabelsOnNodeResponse.newInstance();
   }
 
@@ -673,6 +827,9 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public CheckForDecommissioningNodesResponse checkForDecommissioningNodes(
       CheckForDecommissioningNodesRequest checkForDecommissioningNodesRequest)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return CheckForDecommissioningNodesResponse.newInstance(null);
   }
 
@@ -680,11 +837,17 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol,
   public RefreshClusterMaxPriorityResponse refreshClusterMaxPriority(
       RefreshClusterMaxPriorityRequest request)
       throws YarnException, IOException {
+
+    validateRunning();
+
     return RefreshClusterMaxPriorityResponse.newInstance();
   }
 
   @Override
   public String[] getGroupsForUser(String user) throws IOException {
+
+    validateRunning();
+
     return new String[0];
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
index c7a7767..d09ba75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseRouterPoliciesTest.java
@@ -26,6 +26,7 @@ import java.util.Random;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -87,4 +88,31 @@ public abstract class BaseRouterPoliciesTest
       Assert.assertEquals(removed, chosen);
     }
   }
+
+  /**
+   * This test validates the correctness of blacklist logic in case the cluster
+   * has no active subclusters.
+   */
+  @Test
+  public void testAllBlacklistSubcluster() throws YarnException {
+    FederationRouterPolicy localPolicy = (FederationRouterPolicy) getPolicy();
+    ApplicationSubmissionContext applicationSubmissionContext =
+        ApplicationSubmissionContext.newInstance(null, null, null, null, null,
+            false, false, 0, Resources.none(), null, false, null, null);
+    Map<SubClusterId, SubClusterInfo> activeSubClusters =
+        getActiveSubclusters();
+    if (activeSubClusters != null && activeSubClusters.size() > 1
+        && !(localPolicy instanceof RejectRouterPolicy)) {
+      List<SubClusterId> blacklistSubclusters =
+          new ArrayList<SubClusterId>(activeSubClusters.keySet());
+      try {
+        localPolicy.getHomeSubcluster(applicationSubmissionContext,
+            blacklistSubclusters);
+        Assert.fail();
+      } catch (YarnException e) {
+        Assert.assertTrue(e.getMessage()
+            .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
index 649a61b..423bf86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreTestUtil.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolic
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
@@ -69,7 +70,7 @@ public class FederationStateStoreTestUtil {
         SubClusterState.SC_RUNNING, CLOCK.getTime(), "capability");
   }
 
-  private void registerSubCluster(SubClusterId subClusterId)
+  public void registerSubCluster(SubClusterId subClusterId)
       throws YarnException {
 
     SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
@@ -164,4 +165,16 @@ public class FederationStateStoreTestUtil {
     return result.getPolicyConfiguration();
   }
 
+  public void deregisterAllSubClusters() throws YarnException {
+    for (SubClusterId sc : getAllSubClusterIds(true)) {
+      deRegisterSubCluster(sc);
+    }
+  }
+
+  private void deRegisterSubCluster(SubClusterId subClusterId)
+      throws YarnException {
+    stateStore.deregisterSubCluster(SubClusterDeregisterRequest
+        .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
index f9169e1..f27b2b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/pom.xml
@@ -58,6 +58,11 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-common</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-common</artifactId>
       <type>test-jar</type>
       <scope>test</scope>
     </dependency>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
new file mode 100644
index 0000000..cc96da6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Common utility methods used by the Router server.
+ *
+ */
+@Private
+@Unstable
+public final class RouterServerUtil {
+
+  /** Disable constructor. */
+  private RouterServerUtil() {
+  }
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(RouterServerUtil.class);
+
+  /**
+   * Throws an exception due to an error.
+   *
+   * @param errMsg the error message
+   * @param t the throwable raised in the called class.
+   * @throws YarnException on failure
+   */
+  @Public
+  @Unstable
+  public static void logAndThrowException(String errMsg, Throwable t)
+      throws YarnException {
+    if (t != null) {
+      LOG.error(errMsg, t);
+      throw new YarnException(errMsg, t);
+    } else {
+      LOG.error(errMsg);
+      throw new YarnException(errMsg);
+    }
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
index 5980b03..01ba3bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/AbstractClientRequestInterceptor.java
@@ -18,7 +18,13 @@
 
 package org.apache.hadoop.yarn.server.router.clientrm;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Implements the {@link ClientRequestInterceptor} interface and provides common
@@ -28,9 +34,16 @@ import org.apache.hadoop.conf.Configuration;
  */
 public abstract class AbstractClientRequestInterceptor
     implements ClientRequestInterceptor {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractClientRequestInterceptor.class);
+
   private Configuration conf;
   private ClientRequestInterceptor nextInterceptor;
 
+  @SuppressWarnings("checkstyle:visibilitymodifier")
+  protected UserGroupInformation user = null;
+
   /**
    * Sets the {@link ClientRequestInterceptor} in the chain.
    */
@@ -63,9 +76,10 @@ public abstract class AbstractClientRequestInterceptor
    * Initializes the {@link ClientRequestInterceptor}.
    */
   @Override
-  public void init(String user) {
+  public void init(String userName) {
+    setupUser(userName);
     if (this.nextInterceptor != null) {
-      this.nextInterceptor.init(user);
+      this.nextInterceptor.init(userName);
     }
   }
 
@@ -87,4 +101,27 @@ public abstract class AbstractClientRequestInterceptor
     return this.nextInterceptor;
   }
 
+  private void setupUser(String userName) {
+
+    try {
+      // Do not create a proxy user if user name matches the user name on
+      // current UGI
+      if (userName.equalsIgnoreCase(
+          UserGroupInformation.getCurrentUser().getUserName())) {
+        user = UserGroupInformation.getCurrentUser();
+      } else {
+        user = UserGroupInformation.createProxyUser(userName,
+            UserGroupInformation.getCurrentUser());
+      }
+    } catch (IOException e) {
+      String message = "Error while creating Router ClientRM Service for user:";
+      if (user != null) {
+        message += ", user: " + user;
+      }
+
+      LOG.info(message);
+      throw new YarnRuntimeException(message, e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
index 9e2bfed..71de6b4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/DefaultClientRequestInterceptor.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
@@ -85,8 +84,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRespo
 import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -98,27 +95,14 @@ import com.google.common.annotations.VisibleForTesting;
  */
 public class DefaultClientRequestInterceptor
     extends AbstractClientRequestInterceptor {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(DefaultClientRequestInterceptor.class);
   private ApplicationClientProtocol clientRMProxy;
-  private UserGroupInformation user = null;
 
   @Override
   public void init(String userName) {
     super.init(userName);
-    try {
-      // Do not create a proxy user if user name matches the user name on
-      // current UGI
-      if (userName.equalsIgnoreCase(
-          UserGroupInformation.getCurrentUser().getUserName())) {
-        user = UserGroupInformation.getCurrentUser();
-      } else {
-        user = UserGroupInformation.createProxyUser(userName,
-            UserGroupInformation.getCurrentUser());
-      }
-
-      final Configuration conf = this.getConf();
 
+    final Configuration conf = this.getConf();
+    try {
       clientRMProxy =
           user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
             @Override
@@ -127,16 +111,9 @@ public class DefaultClientRequestInterceptor
                   ApplicationClientProtocol.class);
             }
           });
-    } catch (IOException e) {
-      String message = "Error while creating Router ClientRM Service for user:";
-      if (user != null) {
-        message += ", user: " + user;
-      }
-
-      LOG.info(message);
-      throw new YarnRuntimeException(message, e);
     } catch (Exception e) {
-      throw new YarnRuntimeException(e);
+      throw new YarnRuntimeException(
+          "Unable to create the interface to reach the YarnRM", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
new file mode 100644
index 0000000..ecf53ac
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -0,0 +1,684 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodeLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewReservationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.MoveApplicationAcrossQueuesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationPriorityResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.UpdateApplicationTimeoutsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.router.RouterServerUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Extends the {@code AbstractRequestInterceptorClient} class and provides an
+ * implementation for federation of YARN RM and scaling an application across
+ * multiple YARN SubClusters. All the federation specific implementation is
+ * encapsulated in this class. This is always the last intercepter in the chain.
+ */
+public class FederationClientInterceptor
+    extends AbstractClientRequestInterceptor {
+
+  /*
+   * TODO YARN-6740 Federation Router (hiding multiple RMs for
+   * ApplicationClientProtocol) phase 2.
+   *
+   * The current implementation finalized the main 4 calls (getNewApplication,
+   * submitApplication, forceKillApplication and getApplicationReport). Those
+   * allow us to execute applications E2E.
+   */
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(FederationClientInterceptor.class);
+
+  private int numSubmitRetries;
+  private Map<SubClusterId, ApplicationClientProtocol> clientRMProxies;
+  private FederationStateStoreFacade federationFacade;
+  private Random rand;
+  private RouterPolicyFacade policyFacade;
+
+  @Override
+  public void init(String userName) {
+    super.init(userName);
+
+    federationFacade = FederationStateStoreFacade.getInstance();
+    rand = new Random(System.currentTimeMillis());
+
+    final Configuration conf = this.getConf();
+
+    try {
+      policyFacade = new RouterPolicyFacade(conf, federationFacade,
+          this.federationFacade.getSubClusterResolver(), null);
+    } catch (FederationPolicyInitializationException e) {
+      LOG.error(e.getMessage());
+    }
+
+    numSubmitRetries =
+        conf.getInt(YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_RETRY,
+            YarnConfiguration.DEFAULT_ROUTER_CLIENTRM_SUBMIT_RETRY);
+
+    clientRMProxies =
+        new ConcurrentHashMap<SubClusterId, ApplicationClientProtocol>();
+
+  }
+
+  @Override
+  public void setNextInterceptor(ClientRequestInterceptor next) {
+    throw new YarnRuntimeException("setNextInterceptor is being called on "
+        + "FederationClientRequestInterceptor, which should be the last one "
+        + "in the chain. Check if the interceptor pipeline configuration "
+        + "is correct");
+  }
+
+  @VisibleForTesting
+  protected ApplicationClientProtocol getClientRMProxyForSubCluster(
+      SubClusterId subClusterId) throws YarnException {
+
+    if (clientRMProxies.containsKey(subClusterId)) {
+      return clientRMProxies.get(subClusterId);
+    }
+
+    ApplicationClientProtocol clientRMProxy = null;
+    try {
+      clientRMProxy =
+          user.doAs(new PrivilegedExceptionAction<ApplicationClientProtocol>() {
+            @Override
+            public ApplicationClientProtocol run() throws Exception {
+              return ClientRMProxy.createRMProxy(getConf(),
+                  ApplicationClientProtocol.class);
+            }
+          });
+    } catch (Exception e) {
+      RouterServerUtil.logAndThrowException(
+          "Unable to create the interface to reach the SubCluster "
+              + subClusterId,
+          e);
+    }
+
+    clientRMProxies.put(subClusterId, clientRMProxy);
+    return clientRMProxy;
+  }
+
+  private SubClusterId getRandomActiveSubCluster(
+      Map<SubClusterId, SubClusterInfo> activeSubclusters)
+      throws YarnException {
+
+    if (activeSubclusters == null || activeSubclusters.size() < 1) {
+      RouterServerUtil.logAndThrowException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
+    }
+    List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+
+    return list.get(rand.nextInt(list.size()));
+  }
+
+  /**
+   * Yarn Router forwards every getNewApplication requests to any RM. During
+   * this operation there will be no communication with the State Store. The
+   * Router will forward the requests to any SubCluster. The Router will retry
+   * to submit the request on #numSubmitRetries different SubClusters. The
+   * SubClusters are randomly chosen from the active ones.
+   *
+   * Possible failures and behaviors:
+   *
+   * Client: identical behavior as {@code ClientRMService}.
+   *
+   * Router: the Client will timeout and resubmit.
+   *
+   * ResourceManager: the Router will timeout and contacts another RM.
+   *
+   * StateStore: not in the execution.
+   */
+  @Override
+  public GetNewApplicationResponse getNewApplication(
+      GetNewApplicationRequest request) throws YarnException, IOException {
+    Map<SubClusterId, SubClusterInfo> subClustersActive =
+        federationFacade.getSubClusters(true);
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+      SubClusterId subClusterId = getRandomActiveSubCluster(subClustersActive);
+      LOG.debug(
+          "getNewApplication try #" + i + " on SubCluster " + subClusterId);
+      ApplicationClientProtocol clientRMProxy =
+          getClientRMProxyForSubCluster(subClusterId);
+      GetNewApplicationResponse response = null;
+      try {
+        response = clientRMProxy.getNewApplication(request);
+      } catch (Exception e) {
+        LOG.warn("Unable to create a new ApplicationId in SubCluster "
+            + subClusterId.getId(), e);
+      }
+
+      if (response != null) {
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        subClustersActive.remove(subClusterId);
+      }
+
+    }
+
+    String errMsg = "Fail to create a new application.";
+    LOG.error(errMsg);
+    throw new YarnException(errMsg);
+  }
+
+  /**
+   * Today, in YARN there are no checks of any applicationId submitted.
+   *
+   * Base scenarios:
+   *
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into
+   * StateStore with the selected SubCluster (e.g. SC1) and the appId. • The
+   * State Store replies with the selected SubCluster (e.g. SC1). • The Router
+   * submits the request to the selected SubCluster.
+   *
+   * In case of State Store failure:
+   *
+   * The client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • Due to the
+   * State Store down the Router times out and it will retry depending on the
+   * FederationFacade settings. • The Router replies to the client with an error
+   * message.
+   *
+   * If State Store fails after inserting the tuple: identical behavior as
+   * {@code ClientRMService}.
+   *
+   * In case of Router failure:
+   *
+   * Scenario 1 – Crash before submission to the ResourceManager
+   *
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * crashes. • The Client timeouts and resubmits the application. • The Router
+   * selects one SubCluster to forward the request. • The Router inserts a tuple
+   * into State Store with the selected SubCluster (e.g. SC2) and the appId. •
+   * Because the tuple is already inserted in the State Store, it returns the
+   * previous selected SubCluster (e.g. SC1). • The Router submits the request
+   * to the selected SubCluster (e.g. SC1).
+   *
+   * Scenario 2 – Crash after submission to the ResourceManager
+   *
+   * • The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * submits the request to the selected SubCluster. • The Router crashes. • The
+   * Client timeouts and resubmit the application. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC2) and the appId. • The State
+   * Store replies with the selected SubCluster (e.g. SC1). • The Router submits
+   * the request to the selected SubCluster (e.g. SC1). When a client re-submits
+   * the same application to the same RM, it does not raise an exception and
+   * replies with operation successful message.
+   *
+   * In case of Client failure: identical behavior as {@code ClientRMService}.
+   *
+   * In case of ResourceManager failure:
+   *
+   * The Client submits an application to the Router. • The Router selects one
+   * SubCluster to forward the request. • The Router inserts a tuple into State
+   * Store with the selected SubCluster (e.g. SC1) and the appId. • The Router
+   * submits the request to the selected SubCluster. • The entire SubCluster is
+   * down – all the RMs in HA or the master RM is not reachable. • The Router
+   * times out. • The Router selects a new SubCluster to forward the request. •
+   * The Router update a tuple into State Store with the selected SubCluster
+   * (e.g. SC2) and the appId. • The State Store replies with OK answer. • The
+   * Router submits the request to the selected SubCluster (e.g. SC2).
+   */
+  @Override
+  public SubmitApplicationResponse submitApplication(
+      SubmitApplicationRequest request) throws YarnException, IOException {
+    if (request == null || request.getApplicationSubmissionContext() == null
+        || request.getApplicationSubmissionContext()
+            .getApplicationId() == null) {
+      RouterServerUtil
+          .logAndThrowException("Missing submitApplication request or "
+              + "applicationSubmissionContex information.", null);
+    }
+
+    ApplicationId applicationId =
+        request.getApplicationSubmissionContext().getApplicationId();
+
+    List<SubClusterId> blacklist = new ArrayList<SubClusterId>();
+
+    for (int i = 0; i < numSubmitRetries; ++i) {
+
+      SubClusterId subClusterId = policyFacade.getHomeSubcluster(
+          request.getApplicationSubmissionContext(), blacklist);
+      LOG.info("submitApplication appId" + applicationId + " try #" + i
+          + " on SubCluster " + subClusterId);
+
+      ApplicationHomeSubCluster appHomeSubCluster =
+          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+
+      if (i == 0) {
+        try {
+          // persist the mapping of applicationId and the subClusterId which has
+          // been selected as its home
+          subClusterId =
+              federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String message = "Unable to insert the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          RouterServerUtil.logAndThrowException(message, e);
+        }
+      } else {
+        try {
+          // update the mapping of applicationId and the home subClusterId to
+          // the new subClusterId we have selected
+          federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
+        } catch (YarnException e) {
+          String message = "Unable to update the ApplicationId " + applicationId
+              + " into the FederationStateStore";
+          SubClusterId subClusterIdInStateStore =
+              federationFacade.getApplicationHomeSubCluster(applicationId);
+          if (subClusterId == subClusterIdInStateStore) {
+            LOG.info("Application " + applicationId
+                + " already submitted on SubCluster " + subClusterId);
+          } else {
+            RouterServerUtil.logAndThrowException(message, e);
+          }
+        }
+      }
+
+      ApplicationClientProtocol clientRMProxy =
+          getClientRMProxyForSubCluster(subClusterId);
+
+      SubmitApplicationResponse response = null;
+      try {
+        response = clientRMProxy.submitApplication(request);
+      } catch (Exception e) {
+        LOG.warn("Unable to submit the application " + applicationId
+            + "to SubCluster " + subClusterId.getId(), e);
+      }
+
+      if (response != null) {
+        LOG.info("Application "
+            + request.getApplicationSubmissionContext().getApplicationName()
+            + " with appId " + applicationId + " submitted on " + subClusterId);
+        return response;
+      } else {
+        // Empty response from the ResourceManager.
+        // Blacklist this subcluster for this request.
+        blacklist.add(subClusterId);
+      }
+    }
+
+    String errMsg = "Application "
+        + request.getApplicationSubmissionContext().getApplicationName()
+        + " with appId " + applicationId + " failed to be submitted.";
+    LOG.error(errMsg);
+    throw new YarnException(errMsg);
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   *
+   * Possible failures and behaviors:
+   *
+   * Client: identical behavior as {@code ClientRMService}.
+   *
+   * Router: the Client will timeout and resubmit the request.
+   *
+   * ResourceManager: the Router will timeout and the call will fail.
+   *
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public KillApplicationResponse forceKillApplication(
+      KillApplicationRequest request) throws YarnException, IOException {
+
+    if (request == null || request.getApplicationId() == null) {
+      RouterServerUtil.logAndThrowException(
+          "Missing forceKillApplication request or ApplicationId.", null);
+    }
+    ApplicationId applicationId = request.getApplicationId();
+    SubClusterId subClusterId = null;
+
+    try {
+      subClusterId = federationFacade
+          .getApplicationHomeSubCluster(request.getApplicationId());
+    } catch (YarnException e) {
+      RouterServerUtil.logAndThrowException("Application " + applicationId
+          + " does not exist in FederationStateStore", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy =
+        getClientRMProxyForSubCluster(subClusterId);
+
+    KillApplicationResponse response = null;
+    try {
+      LOG.info("forceKillApplication " + applicationId + " on SubCluster "
+          + subClusterId);
+      response = clientRMProxy.forceKillApplication(request);
+    } catch (Exception e) {
+      LOG.error("Unable to kill the application report for "
+          + request.getApplicationId() + "to SubCluster "
+          + subClusterId.getId(), e);
+      throw e;
+    }
+
+    if (response == null) {
+      LOG.error("No response when attempting to kill the application "
+          + applicationId + " to SubCluster " + subClusterId.getId());
+    }
+
+    return response;
+  }
+
+  /**
+   * The Yarn Router will forward to the respective Yarn RM in which the AM is
+   * running.
+   *
+   * Possible failure:
+   *
+   * Client: identical behavior as {@code ClientRMService}.
+   *
+   * Router: the Client will timeout and resubmit the request.
+   *
+   * ResourceManager: the Router will timeout and the call will fail.
+   *
+   * State Store: the Router will timeout and it will retry depending on the
+   * FederationFacade settings - if the failure happened before the select
+   * operation.
+   */
+  @Override
+  public GetApplicationReportResponse getApplicationReport(
+      GetApplicationReportRequest request) throws YarnException, IOException {
+
+    if (request == null || request.getApplicationId() == null) {
+      RouterServerUtil.logAndThrowException(
+          "Missing getApplicationReport request or applicationId information.",
+          null);
+    }
+
+    SubClusterId subClusterId = null;
+
+    try {
+      subClusterId = federationFacade
+          .getApplicationHomeSubCluster(request.getApplicationId());
+    } catch (YarnException e) {
+      RouterServerUtil
+          .logAndThrowException("Application " + request.getApplicationId()
+              + " does not exist in FederationStateStore", e);
+    }
+
+    ApplicationClientProtocol clientRMProxy =
+        getClientRMProxyForSubCluster(subClusterId);
+
+    GetApplicationReportResponse response = null;
+    try {
+      response = clientRMProxy.getApplicationReport(request);
+    } catch (Exception e) {
+      LOG.error("Unable to get the application report for "
+          + request.getApplicationId() + "to SubCluster "
+          + subClusterId.getId(), e);
+      throw e;
+    }
+
+    if (response == null) {
+      LOG.error("No response when attempting to retrieve the report of "
+          + "the application " + request.getApplicationId() + " to SubCluster "
+          + subClusterId.getId());
+    }
+
+    return response;
+  }
+
+  @Override
+  public GetApplicationsResponse getApplications(GetApplicationsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterMetricsResponse getClusterMetrics(
+      GetClusterMetricsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodesResponse getClusterNodes(GetClusterNodesRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueInfoResponse getQueueInfo(GetQueueInfoRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetQueueUserAclsInfoResponse getQueueUserAcls(
+      GetQueueUserAclsInfoRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public MoveApplicationAcrossQueuesResponse moveApplicationAcrossQueues(
+      MoveApplicationAcrossQueuesRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetNewReservationResponse getNewReservation(
+      GetNewReservationRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationSubmissionResponse submitReservation(
+      ReservationSubmissionRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationListResponse listReservations(
+      ReservationListRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationUpdateResponse updateReservation(
+      ReservationUpdateRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public ReservationDeleteResponse deleteReservation(
+      ReservationDeleteRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetNodesToLabelsResponse getNodeToLabels(
+      GetNodesToLabelsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetLabelsToNodesResponse getLabelsToNodes(
+      GetLabelsToNodesRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetClusterNodeLabelsResponse getClusterNodeLabels(
+      GetClusterNodeLabelsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationAttemptReportResponse getApplicationAttemptReport(
+      GetApplicationAttemptReportRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetApplicationAttemptsResponse getApplicationAttempts(
+      GetApplicationAttemptsRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainerReportResponse getContainerReport(
+      GetContainerReportRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetContainersResponse getContainers(GetContainersRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public GetDelegationTokenResponse getDelegationToken(
+      GetDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public RenewDelegationTokenResponse renewDelegationToken(
+      RenewDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public CancelDelegationTokenResponse cancelDelegationToken(
+      CancelDelegationTokenRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public FailApplicationAttemptResponse failApplicationAttempt(
+      FailApplicationAttemptRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public UpdateApplicationPriorityResponse updateApplicationPriority(
+      UpdateApplicationPriorityRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public SignalContainerResponse signalToContainer(
+      SignalContainerRequest request) throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public UpdateApplicationTimeoutsResponse updateApplicationTimeouts(
+      UpdateApplicationTimeoutsRequest request)
+      throws YarnException, IOException {
+    throw new NotImplementedException();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/52daa6d9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
index 7e15084..7fc4719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/BaseRouterClientRMTest.java
@@ -119,29 +119,41 @@ public abstract class BaseRouterClientRMTest {
     return this.clientrmService;
   }
 
-  @Before
-  public void setUp() {
-    this.conf = new YarnConfiguration();
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration config = new YarnConfiguration();
     String mockPassThroughInterceptorClass =
         PassThroughClientRequestInterceptor.class.getName();
 
     // Create a request intercepter pipeline for testing. The last one in the
     // chain will call the mock resource manager. The others in the chain will
     // simply forward it to the next one in the chain
-    this.conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+    config.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
             + "," + mockPassThroughInterceptorClass + ","
             + MockClientRequestInterceptor.class.getName());
 
-    this.conf.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
+    config.setInt(YarnConfiguration.ROUTER_PIPELINE_CACHE_MAX_SIZE,
         TEST_MAX_CACHE_SIZE);
+    return config;
+  }
 
+  @Before
+  public void setUp() {
+    this.conf = createConfiguration();
     this.dispatcher = new AsyncDispatcher();
     this.dispatcher.init(conf);
     this.dispatcher.start();
     this.clientrmService = createAndStartRouterClientRMService();
   }
 
+  public void setUpConfig() {
+    this.conf = createConfiguration();
+  }
+
+  protected Configuration getConf() {
+    return this.conf;
+  }
+
   @After
   public void tearDown() {
     if (clientrmService != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org