You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/07/14 19:03:48 UTC

[46/51] [abbrv] hadoop git commit: YARN-3659. Federation: routing client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola via Subru).

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
new file mode 100644
index 0000000..87dfc95
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ */
+public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationClientInterceptor.class);
+
+  private TestableFederationClientInterceptor interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+  private List<SubClusterId> subClusters;
+
+  private String user = "test-user";
+
+  private final static int NUM_SUBCLUSTER = 4;
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationClientInterceptor();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    subClusters = new ArrayList<SubClusterId>();
+
+    try {
+      for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+        SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+        stateStoreUtil.registerSubCluster(sc);
+        subClusters.add(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughClientRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + TestableFederationClientInterceptor.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication. The return
+   * ApplicationId has to belong to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testGetNewApplication()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test FederationClientInterceptor: Get New Application");
+
+    GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
+    GetNewApplicationResponse response = interceptor.getNewApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(response.getApplicationId());
+    Assert.assertTrue(
+        response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER);
+    Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication. The application
+   * has to be submitted to one of the SubCluster in the cluster.
+   */
+  @Test
+  public void testSubmitApplication()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test FederationClientInterceptor: Submit Application");
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+    Assert.assertTrue(subClusters.contains(scIdResult));
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of
+   * multiple submission. The first retry has to be submitted to the same
+   * SubCluster of the first attempt.
+   */
+  @Test
+  public void testSubmitApplicationMultipleSubmission()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println(
+        "Test FederationClientInterceptor: Submit Application - Multiple");
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+
+    // First attempt
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult);
+
+    // First retry
+    response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
+    Assert.assertNotNull(scIdResult2);
+    Assert.assertEquals(scIdResult, scIdResult);
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case of empty
+   * request.
+   */
+  @Test
+  public void testSubmitApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println(
+        "Test FederationClientInterceptor: Submit Application - Empty");
+    try {
+      interceptor.submitApplication(null);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing submitApplication request or "
+              + "applicationSubmissionContex information."));
+    }
+    try {
+      interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing submitApplication request or "
+              + "applicationSubmissionContex information."));
+    }
+    try {
+      ApplicationSubmissionContext context = ApplicationSubmissionContext
+          .newInstance(null, "", "", null, null, false, false, -1, null, null);
+      SubmitApplicationRequest request =
+          SubmitApplicationRequest.newInstance(context);
+      interceptor.submitApplication(request);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing submitApplication request or "
+              + "applicationSubmissionContex information."));
+    }
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testForceKillApplication()
+      throws YarnException, IOException, InterruptedException {
+    System.out
+        .println("Test FederationClientInterceptor: Force Kill Application");
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    // Submit the application we are going to kill later
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    KillApplicationRequest requestKill =
+        KillApplicationRequest.newInstance(appId);
+    KillApplicationResponse responseKill =
+        interceptor.forceKillApplication(requestKill);
+    Assert.assertNotNull(responseKill);
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testForceKillApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test FederationClientInterceptor: "
+        + "Force Kill Application - Not Exists");
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    KillApplicationRequest requestKill =
+        KillApplicationRequest.newInstance(appId);
+    try {
+      interceptor.forceKillApplication(requestKill);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().equals(
+          "Application " + appId + " does not exist in FederationStateStore"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of ForceKillApplication in case of
+   * empty request.
+   */
+  @Test
+  public void testForceKillApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println(
+        "Test FederationClientInterceptor: Force Kill Application - Empty");
+    try {
+      interceptor.forceKillApplication(null);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Missing forceKillApplication request or ApplicationId."));
+    }
+    try {
+      interceptor
+          .forceKillApplication(KillApplicationRequest.newInstance(null));
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().startsWith(
+          "Missing forceKillApplication request or ApplicationId."));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application exists in the cluster.
+   */
+  @Test
+  public void testGetApplicationReport()
+      throws YarnException, IOException, InterruptedException {
+    System.out
+        .println("Test FederationClientInterceptor: Get Application Report");
+
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+
+    SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    // Submit the application we want the report later
+    SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+    Assert.assertNotNull(response);
+    Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+    GetApplicationReportRequest requestGet =
+        GetApplicationReportRequest.newInstance(appId);
+
+    GetApplicationReportResponse responseGet =
+        interceptor.getApplicationReport(requestGet);
+
+    Assert.assertNotNull(responseGet);
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case the
+   * application does not exist in StateStore.
+   */
+  @Test
+  public void testGetApplicationNotExists()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println(
+        "Test ApplicationClientProtocol: Get Application Report - Not Exists");
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    GetApplicationReportRequest requestGet =
+        GetApplicationReportRequest.newInstance(appId);
+    try {
+      interceptor.getApplicationReport(requestGet);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().equals(
+          "Application " + appId + " does not exist in FederationStateStore"));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetApplicationReport in case of
+   * empty request.
+   */
+  @Test
+  public void testGetApplicationEmptyRequest()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println(
+        "Test FederationClientInterceptor: Get Application Report - Empty");
+    try {
+      interceptor.getApplicationReport(null);
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing getApplicationReport request or "
+              + "applicationId information."));
+    }
+    try {
+      interceptor
+          .getApplicationReport(GetApplicationReportRequest.newInstance(null));
+      Assert.fail();
+    } catch (YarnException e) {
+      Assert.assertTrue(
+          e.getMessage().startsWith("Missing getApplicationReport request or "
+              + "applicationId information."));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
new file mode 100644
index 0000000..a655c16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ *
+ * It tests the case with SubClusters down and the Router logic of retries. We
+ * have 1 good SubCluster and 2 bad ones for all the tests.
+ */
+public class TestFederationClientInterceptorRetry
+    extends BaseRouterClientRMTest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
+
+  private TestableFederationClientInterceptor interceptor;
+  private MemoryFederationStateStore stateStore;
+  private FederationStateStoreTestUtil stateStoreUtil;
+
+  private String user = "test-user";
+
+  // running and registered
+  private static SubClusterId good;
+
+  // registered but not running
+  private static SubClusterId bad1;
+  private static SubClusterId bad2;
+
+  private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
+
+  @Override
+  public void setUp() {
+    super.setUpConfig();
+    interceptor = new TestableFederationClientInterceptor();
+
+    stateStore = new MemoryFederationStateStore();
+    stateStore.init(this.getConf());
+    FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+        getConf());
+    stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+    interceptor.setConf(this.getConf());
+    interceptor.init(user);
+
+    // Create SubClusters
+    good = SubClusterId.newInstance("0");
+    bad1 = SubClusterId.newInstance("1");
+    bad2 = SubClusterId.newInstance("2");
+    scs.add(good);
+    scs.add(bad1);
+    scs.add(bad2);
+
+    // The mock RM will not start in these SubClusters, this is done to simulate
+    // a SubCluster down
+
+    interceptor.registerBadSubCluster(bad1);
+    interceptor.registerBadSubCluster(bad2);
+  }
+
+  @Override
+  public void tearDown() {
+    interceptor.shutdown();
+    super.tearDown();
+  }
+
+  private void setupCluster(List<SubClusterId> scsToRegister)
+      throws YarnException {
+
+    try {
+      // Clean up the StateStore before every test
+      stateStoreUtil.deregisterAllSubClusters();
+
+      for (SubClusterId sc : scsToRegister) {
+        stateStoreUtil.registerSubCluster(sc);
+      }
+    } catch (YarnException e) {
+      LOG.error(e.getMessage());
+      Assert.fail();
+    }
+  }
+
+  @Override
+  protected YarnConfiguration createConfiguration() {
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+    String mockPassThroughInterceptorClass =
+        PassThroughClientRequestInterceptor.class.getName();
+
+    // Create a request intercepter pipeline for testing. The last one in the
+    // chain is the federation intercepter that calls the mock resource manager.
+    // The others in the chain will simply forward it to the next one in the
+    // chain
+    conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+        mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+            + "," + TestableFederationClientInterceptor.class.getName());
+
+    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+        UniformBroadcastPolicyManager.class.getName());
+
+    // Disable StateStoreFacade cache
+    conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+    return conf;
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testGetNewApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    System.out.println("Test getNewApplication with one bad SubCluster");
+    setupCluster(Arrays.asList(bad2));
+
+    try {
+      interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+      Assert.fail();
+    } catch (Exception e) {
+      System.out.println(e.toString());
+      Assert.assertTrue(e.getMessage()
+          .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testGetNewApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test getNewApplication with two bad SubClusters");
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    try {
+      interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+      Assert.fail();
+    } catch (Exception e) {
+      System.out.println(e.toString());
+      Assert.assertTrue(e.getMessage()
+          .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+    }
+  }
+
+  /**
+   * This test validates the correctness of GetNewApplication in case the
+   * cluster is composed of only 1 bad SubCluster and 1 good one.
+   */
+  @Test
+  public void testGetNewApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test getNewApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+    GetNewApplicationResponse response = null;
+    try {
+      response =
+          interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+    } catch (Exception e) {
+      Assert.fail();
+    }
+    Assert.assertEquals(Integer.parseInt(good.getId()),
+        response.getApplicationId().getClusterTimestamp());
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster.
+   */
+  @Test
+  public void testSubmitApplicationOneBadSC()
+      throws YarnException, IOException, InterruptedException {
+
+    System.out.println("Test submitApplication with one bad SubCluster");
+    setupCluster(Arrays.asList(bad2));
+
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+    final SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    try {
+      interceptor.submitApplication(request);
+      Assert.fail();
+    } catch (Exception e) {
+      System.out.println(e.toString());
+      Assert.assertTrue(e.getMessage()
+          .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+    }
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 2 bad SubClusters.
+   */
+  @Test
+  public void testSubmitApplicationTwoBadSCs()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test submitApplication with two bad SubClusters");
+    setupCluster(Arrays.asList(bad1, bad2));
+
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+    final SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    try {
+      interceptor.submitApplication(request);
+      Assert.fail();
+    } catch (Exception e) {
+      System.out.println(e.toString());
+      Assert.assertTrue(e.getMessage()
+          .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+    }
+  }
+
+  /**
+   * This test validates the correctness of SubmitApplication in case the
+   * cluster is composed of only 1 bad SubCluster and a good one.
+   */
+  @Test
+  public void testSubmitApplicationOneBadOneGood()
+      throws YarnException, IOException, InterruptedException {
+    System.out.println("Test submitApplication with one bad, one good SC");
+    setupCluster(Arrays.asList(good, bad2));
+
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    ApplicationSubmissionContext context = ApplicationSubmissionContext
+        .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+    final SubmitApplicationRequest request =
+        SubmitApplicationRequest.newInstance(context);
+    try {
+      interceptor.submitApplication(request);
+    } catch (Exception e) {
+      Assert.fail();
+    }
+    Assert.assertEquals(good,
+        stateStore
+            .getApplicationHomeSubCluster(
+                GetApplicationHomeSubClusterRequest.newInstance(appId))
+            .getApplicationHomeSubCluster().getHomeSubCluster());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
new file mode 100644
index 0000000..e4a1a42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * Extends the FederationClientInterceptor and overrides methods to provide a
+ * testable implementation of FederationClientInterceptor.
+ */
+public class TestableFederationClientInterceptor
+    extends FederationClientInterceptor {
+
+  private ConcurrentHashMap<SubClusterId, MockResourceManagerFacade> mockRMs =
+      new ConcurrentHashMap<>();
+
+  private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+
+  @Override
+  protected ApplicationClientProtocol getClientRMProxyForSubCluster(
+      SubClusterId subClusterId) throws YarnException {
+
+    MockResourceManagerFacade mockRM = null;
+    synchronized (this) {
+      if (mockRMs.containsKey(subClusterId)) {
+        mockRM = mockRMs.get(subClusterId);
+      } else {
+        mockRM = new MockResourceManagerFacade(super.getConf(), 0,
+            Integer.parseInt(subClusterId.getId()),
+            !badSubCluster.contains(subClusterId));
+        mockRMs.put(subClusterId, mockRM);
+
+      }
+      return mockRM;
+    }
+  }
+
+  /**
+   * For testing purpose, some subclusters has to be down to simulate particular
+   * scenarios as RM Failover, network issues. For this reason we keep track of
+   * these bad subclusters. This method make the subcluster unusable.
+   *
+   * @param badSC the subcluster to make unusable
+   */
+  protected void registerBadSubCluster(SubClusterId badSC) {
+    badSubCluster.add(badSC);
+    if (mockRMs.contains(badSC)) {
+      mockRMs.get(badSC).setRunningMode(false);
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org