You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/10/26 23:42:32 UTC

[hadoop] branch trunk updated: YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId (#5055)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ba77530ff4b YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId (#5055)
ba77530ff4b is described below

commit ba77530ff4b19ae12abeb8546e4db7af855323ce
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Thu Oct 27 07:42:22 2022 +0800

    YARN-11357. Fix FederationClientInterceptor#submitApplication Can't Update SubClusterId (#5055)
---
 .../clientrm/FederationClientInterceptor.java      | 17 +++--
 .../TestFederationClientInterceptorRetry.java      | 74 +++++++++++++++++++-
 .../TestSequentialBroadcastPolicyManager.java      | 39 +++++++++++
 .../clientrm/TestSequentialRouterPolicy.java       | 78 ++++++++++++++++++++++
 4 files changed, 200 insertions(+), 8 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index d2596343a5f..faffbf602be 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -314,7 +314,7 @@ public class FederationClientInterceptor
     // Try calling the getNewApplication method
     List<SubClusterId> blacklist = new ArrayList<>();
     int activeSubClustersCount = getActiveSubClustersCount();
-    int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
+    int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
 
     try {
       GetNewApplicationResponse response =
@@ -470,7 +470,7 @@ public class FederationClientInterceptor
       // but if the number of Active SubClusters is less than this number at this time,
       // we should provide a high number of retry according to the number of Active SubClusters.
       int activeSubClustersCount = getActiveSubClustersCount();
-      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries) + 1;
+      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
 
       // Try calling the SubmitApplication method
       SubmitApplicationResponse response =
@@ -484,7 +484,7 @@ public class FederationClientInterceptor
         return response;
       }
 
-    } catch (Exception e){
+    } catch (Exception e) {
       routerMetrics.incrAppsFailedSubmitted();
       RouterServerUtil.logAndThrowException(e.getMessage(), e);
     }
@@ -543,7 +543,7 @@ public class FederationClientInterceptor
       ApplicationHomeSubCluster appHomeSubCluster =
           ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
 
-      if (exists || retryCount == 0) {
+      if (!exists || retryCount == 0) {
         addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
       } else {
         updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
@@ -563,8 +563,8 @@ public class FederationClientInterceptor
     } catch (Exception e) {
       RouterAuditLogger.logFailure(user.getShortUserName(), SUBMIT_NEW_APP, UNKNOWN,
           TARGET_CLIENT_RM_SERVICE, e.getMessage(), applicationId, subClusterId);
-      LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {} error = {}.",
-          applicationId, subClusterId, e);
+      LOG.warn("Unable to submitApplication appId {} try #{} on SubCluster {}.",
+          applicationId, retryCount, subClusterId, e);
       if (subClusterId != null) {
         blackList.add(subClusterId);
       }
@@ -1948,4 +1948,9 @@ public class FederationClientInterceptor
       }
     }
   }
+
+  @VisibleForTesting
+  public void setNumSubmitRetries(int numSubmitRetries) {
+    this.numSubmitRetries = numSubmitRetries;
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
index f52c9acbd49..2d0bc6b3507 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestFederationClientInterceptorRetry.java
@@ -18,11 +18,14 @@
 
 package org.apache.hadoop.yarn.server.router.clientrm;
 
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.FEDERATION_POLICY_MANAGER;
+import static org.hamcrest.CoreMatchers.is;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -48,7 +51,11 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreTestUt
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
+import org.junit.Assume;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,14 +71,22 @@ import static org.apache.hadoop.yarn.server.federation.policies.FederationPolicy
  * It tests the case with SubClusters down and the Router logic of retries. We
  * have 1 good SubCluster and 2 bad ones for all the tests.
  */
+@RunWith(Parameterized.class)
 public class TestFederationClientInterceptorRetry
     extends BaseRouterClientRMTest {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestFederationClientInterceptorRetry.class);
 
+  @Parameters
+  public static Collection<String[]> getParameters() {
+    return Arrays.asList(new String[][] {{UniformBroadcastPolicyManager.class.getName()},
+        {TestSequentialBroadcastPolicyManager.class.getName()}});
+  }
+
   private TestableFederationClientInterceptor interceptor;
   private MemoryFederationStateStore stateStore;
   private FederationStateStoreTestUtil stateStoreUtil;
+  private String routerPolicyManagerName;
 
   private String user = "test-user";
 
@@ -84,6 +99,10 @@ public class TestFederationClientInterceptorRetry
 
   private static List<SubClusterId> scs = new ArrayList<>();
 
+  public TestFederationClientInterceptorRetry(String policyManagerName) {
+    this.routerPolicyManagerName = policyManagerName;
+  }
+
   @Override
   public void setUp() throws IOException {
     super.setUpConfig();
@@ -150,8 +169,7 @@ public class TestFederationClientInterceptorRetry
         mockPassThroughInterceptorClass + "," + mockPassThroughInterceptorClass
             + "," + TestableFederationClientInterceptor.class.getName());
 
-    conf.set(YarnConfiguration.FEDERATION_POLICY_MANAGER,
-        UniformBroadcastPolicyManager.class.getName());
+    conf.set(FEDERATION_POLICY_MANAGER, this.routerPolicyManagerName);
 
     // Disable StateStoreFacade cache
     conf.setInt(YarnConfiguration.FEDERATION_CACHE_TIME_TO_LIVE_SECS, 0);
@@ -283,4 +301,56 @@ public class TestFederationClientInterceptorRetry
     SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
     Assert.assertEquals(good, respSubClusterId);
   }
+
+  @Test
+  public void testSubmitApplicationTwoBadOneGood() throws Exception {
+
+    LOG.info("Test submitApplication with two bad, one good SC.");
+
+    // This test must require the TestSequentialRouterPolicy policy
+    Assume.assumeThat(routerPolicyManagerName,
+        is(TestSequentialBroadcastPolicyManager.class.getName()));
+
+    setupCluster(Arrays.asList(bad1, bad2, good));
+    final ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+
+    // Use the TestSequentialRouterPolicy strategy,
+    // which will sort the SubClusterId because good=0, bad1=1, bad2=2
+    // We will get 2, 1, 0 [bad2, bad1, good]
+    // Set the retryNum to 1
+    // 1st time will use bad2, 2nd time will use bad1
+    // bad1 is updated to stateStore
+    interceptor.setNumSubmitRetries(1);
+    final SubmitApplicationRequest request = mockSubmitApplicationRequest(appId);
+    LambdaTestUtils.intercept(YarnException.class, "RM is stopped",
+        () -> interceptor.submitApplication(request));
+
+    // We will get bad1
+    checkSubmitSubCluster(appId, bad1);
+
+    // Set the retryNum to 2
+    // 1st time will use bad2, 2nd time will use bad1, 3rd good
+    interceptor.setNumSubmitRetries(2);
+    SubmitApplicationResponse submitAppResponse = interceptor.submitApplication(request);
+    Assert.assertNotNull(submitAppResponse);
+
+    // We will get good
+    checkSubmitSubCluster(appId, good);
+  }
+
+  private void checkSubmitSubCluster(ApplicationId appId, SubClusterId expectSubCluster)
+      throws YarnException {
+    GetApplicationHomeSubClusterRequest getAppRequest =
+        GetApplicationHomeSubClusterRequest.newInstance(appId);
+    GetApplicationHomeSubClusterResponse getAppResponse =
+        stateStore.getApplicationHomeSubCluster(getAppRequest);
+    Assert.assertNotNull(getAppResponse);
+    Assert.assertNotNull(getAppResponse);
+    ApplicationHomeSubCluster responseHomeSubCluster =
+        getAppResponse.getApplicationHomeSubCluster();
+    Assert.assertNotNull(responseHomeSubCluster);
+    SubClusterId respSubClusterId = responseHomeSubCluster.getHomeSubCluster();
+    Assert.assertEquals(expectSubCluster, respSubClusterId);
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java
new file mode 100644
index 00000000000..dfa8c7136d7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialBroadcastPolicyManager.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.manager.AbstractPolicyManager;
+
+/**
+ * This PolicyManager is used for testing and will contain the
+ * {@link TestSequentialRouterPolicy} policy.
+ *
+ * When we test FederationClientInterceptor Retry,
+ * we hope that SubCluster can return in a certain order, not randomly.
+ * We can view the policy description by linking to TestSequentialRouterPolicy.
+ */
+public class TestSequentialBroadcastPolicyManager extends AbstractPolicyManager {
+  public TestSequentialBroadcastPolicyManager() {
+    // this structurally hard-codes two compatible policies for Router and
+    // AMRMProxy.
+    routerFederationPolicy = TestSequentialRouterPolicy.class;
+    amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+  }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java
new file mode 100644
index 00000000000..e702b764fed
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/clientrm/TestSequentialRouterPolicy.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.router.clientrm;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.AbstractRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This is a test strategy,
+ * the purpose of this strategy is to return subClusters in descending order of subClusterId.
+ *
+ * This strategy is to verify the situation of Retry during the use of FederationClientInterceptor.
+ * The conditions of use are as follows:
+ * 1.We require subClusterId to be an integer.
+ * 2.The larger the subCluster, the sooner the representative is selected.
+ *
+ * We have 4 subClusters, 2 normal subClusters, 2 bad subClusters.
+ * We expect to select badSubClusters first and then goodSubClusters during testing.
+ * We can set the subCluster like this, good1 = [0], good2 = [1], bad1 = [2], bad2 = [3].
+ * This strategy will return [3, 2, 1, 0],
+ * The selection order of subCluster is bad2, bad1, good2, good1.
+ */
+public class TestSequentialRouterPolicy extends AbstractRouterPolicy {
+
+  @Override
+  public void reinitialize(FederationPolicyInitializationContext policyContext)
+      throws FederationPolicyInitializationException {
+    FederationPolicyInitializationContextValidator.validate(policyContext,
+        this.getClass().getCanonicalName());
+    setPolicyContext(policyContext);
+  }
+
+  @Override
+  protected SubClusterId chooseSubCluster(String queue,
+      Map<SubClusterId, SubClusterInfo> preSelectSubClusters) throws YarnException {
+    /**
+      * This strategy is only suitable for testing. We need to obtain subClusters sequentially.
+      * We have 3 subClusters, 1 goodSubCluster and 2 badSubClusters.
+      * The sc-id of goodSubCluster is 0, and the sc-id of badSubCluster is 1 and 2.
+      * We hope Return in reverse order, that is, return 2, 1, 0
+      * Return to badCluster first.
+      */
+    List<SubClusterId> subClusterIds = new ArrayList<>(preSelectSubClusters.keySet());
+    if (subClusterIds.size() > 1) {
+      subClusterIds.sort((o1, o2) -> Integer.parseInt(o2.getId()) - Integer.parseInt(o1.getId()));
+    }
+    if(CollectionUtils.isNotEmpty(subClusterIds)){
+      return subClusterIds.get(0);
+    }
+    return null;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org