You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/07/14 19:03:48 UTC
[46/51] [abbrv] hadoop git commit: YARN-3659. Federation: routing
client invocations transparently to multiple RMs. (Giovanni Matteo Fumarola
via Subru).
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
new file mode 100644
index 0000000..87dfc95
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptor.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ */
+public class TestFederationClientInterceptor extends BaseRouterClientRMTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationClientInterceptor.class);
+
+ private TestableFederationClientInterceptor interceptor;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreTestUtil stateStoreUtil;
+ private List<SubClusterId> subClusters;
+
+ private String user = "test-user";
+
+ private final static int NUM_SUBCLUSTER = 4;
+
+ @Override
+ public void setUp() {
+ super.setUpConfig();
+ interceptor = new TestableFederationClientInterceptor();
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(this.getConf());
+ FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+ getConf());
+ stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+ interceptor.setConf(this.getConf());
+ interceptor.init(user);
+
+ subClusters = new ArrayList<SubClusterId>();
+
+ try {
+ for (int i = 0; i < NUM_SUBCLUSTER; i++) {
+ SubClusterId sc = SubClusterId.newInstance(Integer.toString(i));
+ stateStoreUtil.registerSubCluster(sc);
+ subClusters.add(sc);
+ }
+ } catch (YarnException e) {
+ LOG.error(e.getMessage());
+ Assert.fail();
+ }
+
+ }
+
+ @Override
+ public void tearDown() {
+ interceptor.shutdown();
+ super.tearDown();
+ }
+
+ @Override
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughClientRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain is the federation intercepter that calls the mock resource manager.
+ // The others in the chain will simply forward it to the next one in the
+ // chain
+ conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + TestableFederationClientInterceptor.class.getName());
+
+ conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+ UniformBroadcastPolicyManager.class.getName());
+
+ // Disable StateStoreFacade cache
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ return conf;
+ }
+
+ /**
+ * This test validates the correctness of GetNewApplication. The return
+ * ApplicationId has to belong to one of the SubCluster in the cluster.
+ */
+ @Test
+ public void testGetNewApplication()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test FederationClientInterceptor: Get New Application");
+
+ GetNewApplicationRequest request = GetNewApplicationRequest.newInstance();
+ GetNewApplicationResponse response = interceptor.getNewApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(response.getApplicationId());
+ Assert.assertTrue(
+ response.getApplicationId().getClusterTimestamp() < NUM_SUBCLUSTER);
+ Assert.assertTrue(response.getApplicationId().getClusterTimestamp() >= 0);
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication. The application
+ * has to be submitted to one of the SubCluster in the cluster.
+ */
+ @Test
+ public void testSubmitApplication()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test FederationClientInterceptor: Submit Application");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+ Assert.assertNotNull(scIdResult);
+ Assert.assertTrue(subClusters.contains(scIdResult));
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case of
+ * multiple submission. The first retry has to be submitted to the same
+ * SubCluster of the first attempt.
+ */
+ @Test
+ public void testSubmitApplicationMultipleSubmission()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println(
+ "Test FederationClientInterceptor: Submit Application - Multiple");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+
+ // First attempt
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ SubClusterId scIdResult = stateStoreUtil.queryApplicationHomeSC(appId);
+ Assert.assertNotNull(scIdResult);
+
+ // First retry
+ response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ SubClusterId scIdResult2 = stateStoreUtil.queryApplicationHomeSC(appId);
+ Assert.assertNotNull(scIdResult2);
+ Assert.assertEquals(scIdResult, scIdResult);
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case of empty
+ * request.
+ */
+ @Test
+ public void testSubmitApplicationEmptyRequest()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println(
+ "Test FederationClientInterceptor: Submit Application - Empty");
+ try {
+ interceptor.submitApplication(null);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Missing submitApplication request or "
+ + "applicationSubmissionContex information."));
+ }
+ try {
+ interceptor.submitApplication(SubmitApplicationRequest.newInstance(null));
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Missing submitApplication request or "
+ + "applicationSubmissionContex information."));
+ }
+ try {
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(null, "", "", null, null, false, false, -1, null, null);
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ interceptor.submitApplication(request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Missing submitApplication request or "
+ + "applicationSubmissionContex information."));
+ }
+ }
+
+ /**
+ * This test validates the correctness of ForceKillApplication in case the
+ * application exists in the cluster.
+ */
+ @Test
+ public void testForceKillApplication()
+ throws YarnException, IOException, InterruptedException {
+ System.out
+ .println("Test FederationClientInterceptor: Force Kill Application");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ // Submit the application we are going to kill later
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ KillApplicationRequest requestKill =
+ KillApplicationRequest.newInstance(appId);
+ KillApplicationResponse responseKill =
+ interceptor.forceKillApplication(requestKill);
+ Assert.assertNotNull(responseKill);
+ }
+
+ /**
+ * This test validates the correctness of ForceKillApplication in case of
+ * application does not exist in StateStore.
+ */
+ @Test
+ public void testForceKillApplicationNotExists()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test FederationClientInterceptor: "
+ + "Force Kill Application - Not Exists");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ KillApplicationRequest requestKill =
+ KillApplicationRequest.newInstance(appId);
+ try {
+ interceptor.forceKillApplication(requestKill);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().equals(
+ "Application " + appId + " does not exist in FederationStateStore"));
+ }
+ }
+
+ /**
+ * This test validates the correctness of ForceKillApplication in case of
+ * empty request.
+ */
+ @Test
+ public void testForceKillApplicationEmptyRequest()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println(
+ "Test FederationClientInterceptor: Force Kill Application - Empty");
+ try {
+ interceptor.forceKillApplication(null);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ "Missing forceKillApplication request or ApplicationId."));
+ }
+ try {
+ interceptor
+ .forceKillApplication(KillApplicationRequest.newInstance(null));
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().startsWith(
+ "Missing forceKillApplication request or ApplicationId."));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetApplicationReport in case the
+ * application exists in the cluster.
+ */
+ @Test
+ public void testGetApplicationReport()
+ throws YarnException, IOException, InterruptedException {
+ System.out
+ .println("Test FederationClientInterceptor: Get Application Report");
+
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+
+ SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ // Submit the application we want the report later
+ SubmitApplicationResponse response = interceptor.submitApplication(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(stateStoreUtil.queryApplicationHomeSC(appId));
+
+ GetApplicationReportRequest requestGet =
+ GetApplicationReportRequest.newInstance(appId);
+
+ GetApplicationReportResponse responseGet =
+ interceptor.getApplicationReport(requestGet);
+
+ Assert.assertNotNull(responseGet);
+ }
+
+ /**
+ * This test validates the correctness of GetApplicationReport in case the
+ * application does not exist in StateStore.
+ */
+ @Test
+ public void testGetApplicationNotExists()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println(
+ "Test ApplicationClientProtocol: Get Application Report - Not Exists");
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ GetApplicationReportRequest requestGet =
+ GetApplicationReportRequest.newInstance(appId);
+ try {
+ interceptor.getApplicationReport(requestGet);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage().equals(
+ "Application " + appId + " does not exist in FederationStateStore"));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetApplicationReport in case of
+ * empty request.
+ */
+ @Test
+ public void testGetApplicationEmptyRequest()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println(
+ "Test FederationClientInterceptor: Get Application Report - Empty");
+ try {
+ interceptor.getApplicationReport(null);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Missing getApplicationReport request or "
+ + "applicationId information."));
+ }
+ try {
+ interceptor
+ .getApplicationReport(GetApplicationReportRequest.newInstance(null));
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Missing getApplicationReport request or "
+ + "applicationId information."));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
new file mode 100644
index 0000000..a655c16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
@@ -0,0 +1,295 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
+import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Extends the {@code BaseRouterClientRMTest} and overrides methods in order to
+ * use the {@code RouterClientRMService} pipeline test cases for testing the
+ * {@code FederationInterceptor} class. The tests for
+ * {@code RouterClientRMService} has been written cleverly so that it can be
+ * reused to validate different request intercepter chains.
+ *
+ * It tests the case with SubClusters down and the Router logic of retries. We
+ * have 1 good SubCluster and 2 bad ones for all the tests.
+ */
+public class TestFederationClientInterceptorRetry
+ extends BaseRouterClientRMTest {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
+
+ private TestableFederationClientInterceptor interceptor;
+ private MemoryFederationStateStore stateStore;
+ private FederationStateStoreTestUtil stateStoreUtil;
+
+ private String user = "test-user";
+
+ // running and registered
+ private static SubClusterId good;
+
+ // registered but not running
+ private static SubClusterId bad1;
+ private static SubClusterId bad2;
+
+ private static List<SubClusterId> scs = new ArrayList<SubClusterId>();
+
+ @Override
+ public void setUp() {
+ super.setUpConfig();
+ interceptor = new TestableFederationClientInterceptor();
+
+ stateStore = new MemoryFederationStateStore();
+ stateStore.init(this.getConf());
+ FederationStateStoreFacade.getInstance().reinitialize(stateStore,
+ getConf());
+ stateStoreUtil = new FederationStateStoreTestUtil(stateStore);
+
+ interceptor.setConf(this.getConf());
+ interceptor.init(user);
+
+ // Create SubClusters
+ good = SubClusterId.newInstance("0");
+ bad1 = SubClusterId.newInstance("1");
+ bad2 = SubClusterId.newInstance("2");
+ scs.add(good);
+ scs.add(bad1);
+ scs.add(bad2);
+
+ // The mock RM will not start in these SubClusters, this is done to simulate
+ // a SubCluster down
+
+ interceptor.registerBadSubCluster(bad1);
+ interceptor.registerBadSubCluster(bad2);
+ }
+
+ @Override
+ public void tearDown() {
+ interceptor.shutdown();
+ super.tearDown();
+ }
+
+ private void setupCluster(List<SubClusterId> scsToRegister)
+ throws YarnException {
+
+ try {
+ // Clean up the StateStore before every test
+ stateStoreUtil.deregisterAllSubClusters();
+
+ for (SubClusterId sc : scsToRegister) {
+ stateStoreUtil.registerSubCluster(sc);
+ }
+ } catch (YarnException e) {
+ LOG.error(e.getMessage());
+ Assert.fail();
+ }
+ }
+
+ @Override
+ protected YarnConfiguration createConfiguration() {
+ YarnConfiguration conf = new YarnConfiguration();
+ conf.setBoolean(YarnConfiguration.FEDERATION_ENABLED, true);
+ String mockPassThroughInterceptorClass =
+ PassThroughClientRequestInterceptor.class.getName();
+
+ // Create a request intercepter pipeline for testing. The last one in the
+ // chain is the federation intercepter that calls the mock resource manager.
+ // The others in the chain will simply forward it to the next one in the
+ // chain
+ conf.set(YarnConfiguration.ROUTER_CLIENTRM_INTERCEPTOR_CLASS_PIPELINE,
+ mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
+ + "," + TestableFederationClientInterceptor.class.getName());
+
+ conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
+ UniformBroadcastPolicyManager.class.getName());
+
+ // Disable StateStoreFacade cache
+ conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
+
+ return conf;
+ }
+
+ /**
+ * This test validates the correctness of GetNewApplication in case the
+ * cluster is composed of only 1 bad SubCluster.
+ */
+ @Test
+ public void testGetNewApplicationOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+
+ System.out.println("Test getNewApplication with one bad SubCluster");
+ setupCluster(Arrays.asList(bad2));
+
+ try {
+ interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+ Assert.fail();
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ Assert.assertTrue(e.getMessage()
+ .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetNewApplication in case the
+ * cluster is composed of only 2 bad SubClusters.
+ */
+ @Test
+ public void testGetNewApplicationTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test getNewApplication with two bad SubClusters");
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ try {
+ interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+ Assert.fail();
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ Assert.assertTrue(e.getMessage()
+ .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+ }
+ }
+
+ /**
+ * This test validates the correctness of GetNewApplication in case the
+ * cluster is composed of only 1 bad SubCluster and 1 good one.
+ */
+ @Test
+ public void testGetNewApplicationOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test getNewApplication with one bad, one good SC");
+ setupCluster(Arrays.asList(good, bad2));
+ GetNewApplicationResponse response = null;
+ try {
+ response =
+ interceptor.getNewApplication(GetNewApplicationRequest.newInstance());
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(Integer.parseInt(good.getId()),
+ response.getApplicationId().getClusterTimestamp());
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case the
+ * cluster is composed of only 1 bad SubCluster.
+ */
+ @Test
+ public void testSubmitApplicationOneBadSC()
+ throws YarnException, IOException, InterruptedException {
+
+ System.out.println("Test submitApplication with one bad SubCluster");
+ setupCluster(Arrays.asList(bad2));
+
+ final ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+ final SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ try {
+ interceptor.submitApplication(request);
+ Assert.fail();
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ Assert.assertTrue(e.getMessage()
+ .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+ }
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case the
+ * cluster is composed of only 2 bad SubClusters.
+ */
+ @Test
+ public void testSubmitApplicationTwoBadSCs()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test submitApplication with two bad SubClusters");
+ setupCluster(Arrays.asList(bad1, bad2));
+
+ final ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+ final SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ try {
+ interceptor.submitApplication(request);
+ Assert.fail();
+ } catch (Exception e) {
+ System.out.println(e.toString());
+ Assert.assertTrue(e.getMessage()
+ .equals(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE));
+ }
+ }
+
+ /**
+ * This test validates the correctness of SubmitApplication in case the
+ * cluster is composed of only 1 bad SubCluster and a good one.
+ */
+ @Test
+ public void testSubmitApplicationOneBadOneGood()
+ throws YarnException, IOException, InterruptedException {
+ System.out.println("Test submitApplication with one bad, one good SC");
+ setupCluster(Arrays.asList(good, bad2));
+
+ final ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+ ApplicationSubmissionContext context = ApplicationSubmissionContext
+ .newInstance(appId, "", "", null, null, false, false, -1, null, null);
+ final SubmitApplicationRequest request =
+ SubmitApplicationRequest.newInstance(context);
+ try {
+ interceptor.submitApplication(request);
+ } catch (Exception e) {
+ Assert.fail();
+ }
+ Assert.assertEquals(good,
+ stateStore
+ .getApplicationHomeSubCluster(
+ GetApplicationHomeSubClusterRequest.newInstance(appId))
+ .getApplicationHomeSubCluster().getHomeSubCluster());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d94d41c0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
new file mode 100644
index 0000000..e4a1a42
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestableFederationClientInterceptor.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+
+/**
+ * Extends the FederationClientInterceptor and overrides methods to provide a
+ * testable implementation of FederationClientInterceptor.
+ */
+public class TestableFederationClientInterceptor
+ extends FederationClientInterceptor {
+
+ private ConcurrentHashMap<SubClusterId, MockResourceManagerFacade> mockRMs =
+ new ConcurrentHashMap<>();
+
+ private List<SubClusterId> badSubCluster = new ArrayList<SubClusterId>();
+
+ @Override
+ protected ApplicationClientProtocol getClientRMProxyForSubCluster(
+ SubClusterId subClusterId) throws YarnException {
+
+ MockResourceManagerFacade mockRM = null;
+ synchronized (this) {
+ if (mockRMs.containsKey(subClusterId)) {
+ mockRM = mockRMs.get(subClusterId);
+ } else {
+ mockRM = new MockResourceManagerFacade(super.getConf(), 0,
+ Integer.parseInt(subClusterId.getId()),
+ !badSubCluster.contains(subClusterId));
+ mockRMs.put(subClusterId, mockRM);
+
+ }
+ return mockRM;
+ }
+ }
+
+ /**
+ * For testing purpose, some subclusters has to be down to simulate particular
+ * scenarios as RM Failover, network issues. For this reason we keep track of
+ * these bad subclusters. This method make the subcluster unusable.
+ *
+ * @param badSC the subcluster to make unusable
+ */
+ protected void registerBadSubCluster(SubClusterId badSC) {
+ badSubCluster.add(badSC);
+ if (mockRMs.contains(badSC)) {
+ mockRMs.get(badSC).setRunningMode(false);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org