You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/11/22 22:38:46 UTC

[hadoop] branch trunk updated: YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7cb22eb72d5 YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)
7cb22eb72d5 is described below

commit 7cb22eb72d587d4b721c3e072bb2811c767db9ab
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Nov 23 06:38:24 2022 +0800

    YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)
---
 .../utils/FederationStateStoreFacade.java          | 170 ++++++++++-
 .../yarn/server/router/RouterServerUtil.java       |  45 ---
 .../clientrm/FederationClientInterceptor.java      |  96 +-----
 .../router/webapp/FederationInterceptorREST.java   | 324 +++++++++------------
 .../webapp/TestFederationInterceptorRESTRetry.java |  17 +-
 5 files changed, 334 insertions(+), 318 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 47cb9e9e35c..fc1e442ab9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -22,8 +22,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.Random;
 
 import javax.cache.Cache;
 import javax.cache.CacheManager;
@@ -38,6 +40,8 @@ import javax.cache.integration.CacheLoader;
 import javax.cache.integration.CacheLoaderException;
 import javax.cache.spi.CachingProvider;
 
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.retry.RetryPolicies;
@@ -50,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
 import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
@@ -110,6 +116,8 @@ public final class FederationStateStoreFacade {
   private static final FederationStateStoreFacade FACADE =
       new FederationStateStoreFacade();
 
+  private static Random rand = new Random(System.currentTimeMillis());
+
   private FederationStateStore stateStore;
   private int cacheTimeToLive;
   private Configuration conf;
@@ -496,6 +504,7 @@ public final class FederationStateStoreFacade {
    * @param defaultValue the default implementation for fallback
    * @param type the class for which a retry proxy is required
    * @param retryPolicy the policy for retrying method call failures
+   * @param <T> The type of the instance
    * @return a retry proxy for the specified interface
    */
   public static <T> Object createRetryInstance(Configuration conf,
@@ -731,7 +740,7 @@ public final class FederationStateStoreFacade {
     return stateStore;
   }
 
-  /*
+  /**
    * The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
    *
    * @param newKey Key used for generating and verifying delegation tokens
@@ -849,4 +858,163 @@ public final class FederationStateStoreFacade {
     RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
     return stateStore.getTokenByRouterStoreToken(request);
   }
+
+  /**
+   * Get the number of active cluster nodes.
+   *
+   * @return number of active cluster nodes.
+   * @throws YarnException if the call to the state store is unsuccessful.
+   */
+  public int getActiveSubClustersCount() throws YarnException {
+    Map<SubClusterId, SubClusterInfo> activeSubClusters = getSubClusters(true);
+    if (activeSubClusters == null || activeSubClusters.isEmpty()) {
+      return 0;
+    } else {
+      return activeSubClusters.size();
+    }
+  }
+
+  /**
+   * Randomly pick ActiveSubCluster.
+   * During the selection process, we will exclude SubClusters from the blacklist.
+   *
+   * @param activeSubClusters List of active subClusters.
+   * @param blackList blacklist.
+   * @return Active SubClusterId.
+   * @throws YarnException When there is no Active SubCluster,
+   * an exception will be thrown (No active SubCluster available to submit the request.)
+   */
+  public static SubClusterId getRandomActiveSubCluster(
+      Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
+      throws YarnException {
+
+    // Check if activeSubClusters is empty, if it is empty, we need to throw an exception
+    if (MapUtils.isEmpty(activeSubClusters)) {
+      throw new FederationPolicyException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+    }
+
+    // Change activeSubClusters to List
+    List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
+
+    // If the blacklist is not empty, we need to remove all the subClusters in the blacklist
+    if (CollectionUtils.isNotEmpty(blackList)) {
+      subClusterIds.removeAll(blackList);
+    }
+
+    // Check there are still active subcluster after removing the blacklist
+    if (CollectionUtils.isEmpty(subClusterIds)) {
+      throw new FederationPolicyException(
+          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+    }
+
+    // Randomly choose a SubCluster
+    return subClusterIds.get(rand.nextInt(subClusterIds.size()));
+  }
+
+  /**
+   * Get the number of retries.
+   *
+   * @param configRetries User-configured number of retries.
+   * @return number of retries.
+   * @throws YarnException yarn exception.
+   */
+  public int getRetryNumbers(int configRetries) throws YarnException {
+    int activeSubClustersCount = getActiveSubClustersCount();
+    int actualRetryNums = Math.min(activeSubClustersCount, configRetries);
+    // Normally, we don't set a negative number for the number of retries,
+    // but if the user sets a negative number for the number of retries,
+    // we will return 0
+    if (actualRetryNums < 0) {
+      return 0;
+    }
+    return actualRetryNums;
+  }
+
+  /**
+   * Query SubClusterId By applicationId.
+   *
+   * If SubClusterId is not empty, it means it exists and returns true;
+   * if SubClusterId is empty, it means it does not exist and returns false.
+   *
+   * @param applicationId applicationId
+   * @return true, SubClusterId exists; false, SubClusterId not exists.
+   */
+  public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
+    try {
+      SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId);
+      if (subClusterId != null) {
+        return true;
+      }
+    } catch (YarnException e) {
+      LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
+    }
+    return false;
+  }
+
+  /**
+   * Add ApplicationHomeSubCluster to FederationStateStore.
+   *
+   * @param applicationId applicationId.
+   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
+   * @throws YarnException yarn exception.
+   */
+  public void addApplicationHomeSubCluster(ApplicationId applicationId,
+      ApplicationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      addApplicationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      String msg = String.format(
+          "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
+      throw new YarnException(msg, e);
+    }
+  }
+
+  /**
+   * Update ApplicationHomeSubCluster to FederationStateStore.
+   *
+   * @param subClusterId homeSubClusterId
+   * @param applicationId applicationId.
+   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
+   * @throws YarnException yarn exception.
+   */
+  public void updateApplicationHomeSubCluster(SubClusterId subClusterId,
+      ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
+    try {
+      updateApplicationHomeSubCluster(homeSubCluster);
+    } catch (YarnException e) {
+      SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId);
+      if (subClusterId == subClusterIdInStateStore) {
+        LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId);
+      } else {
+        String msg = String.format(
+            "Unable to update the ApplicationId %s into the FederationStateStore.", applicationId);
+        throw new YarnException(msg, e);
+      }
+    }
+  }
+
+  /**
+   * Add or Update ApplicationHomeSubCluster.
+   *
+   * @param applicationId applicationId, is the id of the application.
+   * @param subClusterId homeSubClusterId, this is selected by strategy.
+   * @param retryCount number of retries.
+   * @throws YarnException yarn exception.
+   */
+  public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
+      SubClusterId subClusterId, int retryCount) throws YarnException {
+    Boolean exists = existsApplicationHomeSubCluster(applicationId);
+    ApplicationHomeSubCluster appHomeSubCluster =
+        ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+    if (!exists || retryCount == 0) {
+      // persist the mapping of applicationId and the subClusterId which has
+      // been selected as its home.
+      addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
+    } else {
+      // update the mapping of applicationId and the home subClusterId to
+      // the new subClusterId we have selected.
+      updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
+    }
+  }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 6dd49daa4ed..93818229dd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.router;
 
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -30,9 +28,6 @@ import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,8 +36,6 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
-import java.util.Random;
 import java.io.IOException;
 
 /**
@@ -61,8 +54,6 @@ public final class RouterServerUtil {
 
   private static final String EPOCH_PREFIX = "e";
 
-  private static Random rand = new Random(System.currentTimeMillis());
-
   /** Disable constructor. */
   private RouterServerUtil() {
   }
@@ -479,42 +470,6 @@ public final class RouterServerUtil {
     }
   }
 
-  /**
-   * Randomly pick ActiveSubCluster.
-   * During the selection process, we will exclude SubClusters from the blacklist.
-   *
-   * @param activeSubClusters List of active subClusters.
-   * @param blackList blacklist.
-   * @return Active SubClusterId.
-   * @throws YarnException When there is no Active SubCluster,
-   * an exception will be thrown (No active SubCluster available to submit the request.)
-   */
-  public static SubClusterId getRandomActiveSubCluster(
-      Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
-      throws YarnException {
-
-    // Check if activeSubClusters is empty, if it is empty, we need to throw an exception
-    if (MapUtils.isEmpty(activeSubClusters)) {
-      logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
-    }
-
-    // Change activeSubClusters to List
-    List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
-
-    // If the blacklist is not empty, we need to remove all the subClusters in the blacklist
-    if (CollectionUtils.isNotEmpty(blackList)) {
-      subClusterIds.removeAll(blackList);
-    }
-
-    // Check there are still active subcluster after removing the blacklist
-    if (CollectionUtils.isEmpty(subClusterIds)) {
-      logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
-    }
-
-    // Randomly choose a SubCluster
-    return subClusterIds.get(rand.nextInt(subClusterIds.size()));
-  }
-
   public static UserGroupInformation setupUser(final String userName) {
     UserGroupInformation user = null;
     try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 9bfcb04ff8a..cf457c70771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -272,17 +272,6 @@ public class FederationClientInterceptor
     return list.get(rand.nextInt(list.size()));
   }
 
-  @VisibleForTesting
-  private int getActiveSubClustersCount() throws YarnException {
-    Map<SubClusterId, SubClusterInfo> activeSubClusters =
-        federationFacade.getSubClusters(true);
-    if (activeSubClusters == null || activeSubClusters.isEmpty()) {
-      return 0;
-    } else {
-      return activeSubClusters.size();
-    }
-  }
-
   /**
    * YARN Router forwards every getNewApplication requests to any RM. During
    * this operation there will be no communication with the State Store. The
@@ -318,7 +307,7 @@ public class FederationClientInterceptor
 
     // Try calling the getNewApplication method
     List<SubClusterId> blacklist = new ArrayList<>();
-    int activeSubClustersCount = getActiveSubClustersCount();
+    int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
     int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
 
     try {
@@ -361,7 +350,7 @@ public class FederationClientInterceptor
       List<SubClusterId> blackList, GetNewApplicationRequest request, int retryCount)
       throws YarnException, IOException {
     SubClusterId subClusterId =
-        RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList);
+        federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
     LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
     ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
     try {
@@ -474,7 +463,7 @@ public class FederationClientInterceptor
       // the user will provide us with an expected submitRetries,
       // but if the number of Active SubClusters is less than this number at this time,
       // we should provide a high number of retry according to the number of Active SubClusters.
-      int activeSubClustersCount = getActiveSubClustersCount();
+      int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
       int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
 
       // Try calling the SubmitApplication method
@@ -542,17 +531,10 @@ public class FederationClientInterceptor
       LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
           applicationId, retryCount, subClusterId);
 
-      // Step2. Query homeSubCluster according to ApplicationId.
-      Boolean exists = existsApplicationHomeSubCluster(applicationId);
-
-      ApplicationHomeSubCluster appHomeSubCluster =
-          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
-
-      if (!exists || retryCount == 0) {
-        addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
-      } else {
-        updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
-      }
+      // Step2. We Store the mapping relationship
+      // between Application and HomeSubCluster in stateStore.
+      federationFacade.addOrUpdateApplicationHomeSubCluster(
+          applicationId, subClusterId, retryCount);
 
       // Step3. SubmitApplication to the subCluster
       ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
@@ -581,70 +563,6 @@ public class FederationClientInterceptor
     throw new YarnException(msg);
   }
 
-  /**
-   * Add ApplicationHomeSubCluster to FederationStateStore.
-   *
-   * @param applicationId applicationId.
-   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
-   * @throws YarnException yarn exception.
-   */
-  private void addApplicationHomeSubCluster(ApplicationId applicationId,
-      ApplicationHomeSubCluster homeSubCluster) throws YarnException {
-    try {
-      federationFacade.addApplicationHomeSubCluster(homeSubCluster);
-    } catch (YarnException e) {
-      RouterServerUtil.logAndThrowException(e,
-          "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
-    }
-  }
-
-  /**
-   * Update ApplicationHomeSubCluster to FederationStateStore.
-   *
-   * @param subClusterId homeSubClusterId
-   * @param applicationId applicationId.
-   * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
-   * @throws YarnException yarn exception.
-   */
-  private void updateApplicationHomeSubCluster(SubClusterId subClusterId,
-      ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
-    try {
-      federationFacade.updateApplicationHomeSubCluster(homeSubCluster);
-    } catch (YarnException e) {
-      SubClusterId subClusterIdInStateStore =
-          federationFacade.getApplicationHomeSubCluster(applicationId);
-      if (subClusterId == subClusterIdInStateStore) {
-        LOG.info("Application {} already submitted on SubCluster {}.",
-            applicationId, subClusterId);
-      } else {
-        RouterServerUtil.logAndThrowException(e,
-            "Unable to update the ApplicationId %s into the FederationStateStore.",
-            applicationId);
-      }
-    }
-  }
-
-  /**
-   * Query SubClusterId By applicationId.
-   *
-   * If SubClusterId is not empty, it means it exists and returns true;
-   * if SubClusterId is empty, it means it does not exist and returns false.
-   *
-   * @param applicationId applicationId
-   * @return true, SubClusterId exists; false, SubClusterId not exists.
-   */
-  private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
-    try {
-      SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
-      if (subClusterId != null) {
-        return true;
-      }
-    } catch (YarnException e) {
-      LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
-    }
-    return false;
-  }
-
   /**
    * The YARN Router will forward to the respective YARN RM in which the AM is
    * running.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index dc3508e0dfa..a21be7b4e1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -27,14 +27,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletRequestWrapper;
@@ -62,9 +61,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
 import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
 import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
 import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
-import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
 import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
 import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -134,13 +134,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
   private int numSubmitRetries;
   private FederationStateStoreFacade federationFacade;
-  private Random rand;
   private RouterPolicyFacade policyFacade;
   private RouterMetrics routerMetrics;
   private final Clock clock = new MonotonicClock();
   private boolean returnPartialReport;
   private boolean appInfosCacheEnabled;
   private int appInfosCacheCount;
+  private long submitIntervalTime;
 
   private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
   private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;
@@ -156,7 +156,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
     super.init(user);
 
     federationFacade = FederationStateStoreFacade.getInstance();
-    rand = new Random();
 
     final Configuration conf = this.getConf();
 
@@ -194,24 +193,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
           YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
       appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
     }
-  }
-
-  private SubClusterId getRandomActiveSubCluster(
-      Map<SubClusterId, SubClusterInfo> activeSubclusters,
-      List<SubClusterId> blackListSubClusters) throws YarnException {
-
-    if (activeSubclusters == null || activeSubclusters.size() < 1) {
-      RouterServerUtil.logAndThrowException(
-          FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
-    }
-    Collection<SubClusterId> keySet = activeSubclusters.keySet();
-    FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters);
-    if (blackListSubClusters != null) {
-      keySet.removeAll(blackListSubClusters);
-    }
 
-    List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
-    return list.get(rand.nextInt(list.size()));
+    submitIntervalTime = conf.getTimeDuration(
+        YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
+        YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
   }
 
   @VisibleForTesting
@@ -301,62 +286,79 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     long startTime = clock.getTime();
 
-    Map<SubClusterId, SubClusterInfo> subClustersActive;
     try {
-      subClustersActive = federationFacade.getSubClusters(true);
-    } catch (YarnException e) {
+      Map<SubClusterId, SubClusterInfo> subClustersActive =
+          federationFacade.getSubClusters(true);
+
+      // We declare blackList and retries.
+      List<SubClusterId> blackList = new ArrayList<>();
+      int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
+      Response response = ((FederationActionRetry<Response>) (retryCount) ->
+          invokeGetNewApplication(subClustersActive, blackList, hsr, retryCount)).
+          runWithRetries(actualRetryNums, submitIntervalTime);
+
+      // If the response is not empty and the status is SC_OK,
+      // this request can be returned directly.
+      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsCreated(stopTime - startTime);
+        return response;
+      }
+    } catch (FederationPolicyException e) {
+      // If a FederationPolicyException is thrown, the service is unavailable.
       routerMetrics.incrAppsFailedCreated();
-      return Response.status(Status.INTERNAL_SERVER_ERROR)
-          .entity(e.getLocalizedMessage()).build();
+      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+    } catch (Exception e) {
+      routerMetrics.incrAppsFailedCreated();
+      return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
     }
 
-    List<SubClusterId> blacklist = new ArrayList<>();
-
-    for (int i = 0; i < numSubmitRetries; ++i) {
-
-      SubClusterId subClusterId;
-      try {
-        subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
-      } catch (YarnException e) {
-        routerMetrics.incrAppsFailedCreated();
-        return Response.status(Status.SERVICE_UNAVAILABLE)
-            .entity(e.getLocalizedMessage()).build();
-      }
+    // return error message directly.
+    String errMsg = "Fail to create a new application.";
+    LOG.error(errMsg);
+    routerMetrics.incrAppsFailedCreated();
+    return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+  }
 
-      LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
+  /**
+   * Invoke GetNewApplication to different subClusters.
+   *
+   * @param subClustersActive Active SubClusters.
+   * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
+   * @param hsr HttpServletRequest.
+   * @param retryCount number of retries.
+   * @return Get response, If the response is empty or status not equal SC_OK, the request fails,
+   * if the response is not empty and status equal SC_OK, the request is successful.
+   * @throws YarnException yarn exception.
+   * @throws IOException io error.
+   * @throws InterruptedException interrupted exception.
+   */
+  private Response invokeGetNewApplication(Map<SubClusterId, SubClusterInfo> subClustersActive,
+      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+      throws YarnException, IOException, InterruptedException {
 
-      DefaultRequestInterceptorREST interceptor =
-          getOrCreateInterceptorForSubCluster(subClusterId,
-              subClustersActive.get(subClusterId).getRMWebServiceAddress());
-      Response response = null;
-      try {
-        response = interceptor.createNewApplication(hsr);
-      } catch (Exception e) {
-        LOG.warn("Unable to create a new ApplicationId in SubCluster {}.",
-            subClusterId.getId(), e);
-      }
+    SubClusterId subClusterId =
+        federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
 
-      if (response != null &&
-          response.getStatus() == HttpServletResponse.SC_OK) {
+    LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
 
-        long stopTime = clock.getTime();
-        routerMetrics.succeededAppsCreated(stopTime - startTime);
+    DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId,
+        subClustersActive.get(subClusterId).getRMWebServiceAddress());
 
+    try {
+      Response response = interceptor.createNewApplication(hsr);
+      if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
         return response;
-      } else {
-        // Empty response from the ResourceManager.
-        // Blacklist this subcluster for this request.
-        blacklist.add(subClusterId);
       }
+    } catch (Exception e) {
+      blackList.add(subClusterId);
+      RouterServerUtil.logAndThrowException(e.getMessage(), e);
     }
 
-    String errMsg = "Fail to create a new application.";
-    LOG.error(errMsg);
-    routerMetrics.incrAppsFailedCreated();
-    return Response
-        .status(Status.INTERNAL_SERVER_ERROR)
-        .entity(errMsg)
-        .build();
+    // We need to throw the exception directly.
+    String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.",
+        subClusterId.getId());
+    throw new YarnException(msg);
   }
 
   /**
@@ -431,142 +433,106 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
 
     long startTime = clock.getTime();
 
+    // We verify the parameters to ensure that newApp is not empty and
+    // that the format of applicationId is correct.
     if (newApp == null || newApp.getApplicationId() == null) {
       routerMetrics.incrAppsFailedSubmitted();
       String errMsg = "Missing ApplicationSubmissionContextInfo or "
           + "applicationSubmissionContext information.";
-      return Response
-          .status(Status.BAD_REQUEST)
-          .entity(errMsg)
-          .build();
+      return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
     }
 
-    ApplicationId applicationId = null;
     try {
-      applicationId = ApplicationId.fromString(newApp.getApplicationId());
+      String applicationId = newApp.getApplicationId();
+      RouterServerUtil.validateApplicationId(applicationId);
     } catch (IllegalArgumentException e) {
       routerMetrics.incrAppsFailedSubmitted();
-      return Response
-          .status(Status.BAD_REQUEST)
-          .entity(e.getLocalizedMessage())
-          .build();
+      return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()).build();
     }
 
-    List<SubClusterId> blacklist = new ArrayList<>();
-
-    for (int i = 0; i < numSubmitRetries; ++i) {
-
-      ApplicationSubmissionContext context =
-          RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
-
-      SubClusterId subClusterId = null;
-      try {
-        subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
-      } catch (YarnException e) {
-        routerMetrics.incrAppsFailedSubmitted();
-        return Response
-            .status(Status.SERVICE_UNAVAILABLE)
-            .entity(e.getLocalizedMessage())
-            .build();
+    List<SubClusterId> blackList = new ArrayList<>();
+    try {
+      int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
+      int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
+      Response response = ((FederationActionRetry<Response>) (retryCount) ->
+          invokeSubmitApplication(newApp, blackList, hsr, retryCount)).
+          runWithRetries(actualRetryNums, submitIntervalTime);
+      if (response != null) {
+        long stopTime = clock.getTime();
+        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+        return response;
       }
-      LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
-          applicationId, i, subClusterId);
+    } catch (Exception e) {
+      routerMetrics.incrAppsFailedSubmitted();
+      return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+    }
 
-      ApplicationHomeSubCluster appHomeSubCluster =
-          ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+    routerMetrics.incrAppsFailedSubmitted();
+    String errMsg = String.format("Application %s with appId %s failed to be submitted.",
+        newApp.getApplicationName(), newApp.getApplicationId());
+    LOG.error(errMsg);
+    return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
+  }
 
-      if (i == 0) {
-        try {
-          // persist the mapping of applicationId and the subClusterId which has
-          // been selected as its home
-          subClusterId =
-              federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
-        } catch (YarnException e) {
-          routerMetrics.incrAppsFailedSubmitted();
-          String errMsg = "Unable to insert the ApplicationId " + applicationId
-              + " into the FederationStateStore";
-          return Response
-              .status(Status.SERVICE_UNAVAILABLE)
-              .entity(errMsg + " " + e.getLocalizedMessage())
-              .build();
-        }
-      } else {
-        try {
-          // update the mapping of applicationId and the home subClusterId to
-          // the new subClusterId we have selected
-          federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
-        } catch (YarnException e) {
-          String errMsg = "Unable to update the ApplicationId " + applicationId
-              + " into the FederationStateStore";
-          SubClusterId subClusterIdInStateStore;
-          try {
-            subClusterIdInStateStore =
-                federationFacade.getApplicationHomeSubCluster(applicationId);
-          } catch (YarnException e1) {
-            routerMetrics.incrAppsFailedSubmitted();
-            return Response
-                .status(Status.SERVICE_UNAVAILABLE)
-                .entity(e1.getLocalizedMessage())
-                .build();
-          }
-          if (subClusterId == subClusterIdInStateStore) {
-            LOG.info("Application {} already submitted on SubCluster {}.",
-                applicationId, subClusterId);
-          } else {
-            routerMetrics.incrAppsFailedSubmitted();
-            return Response
-                .status(Status.SERVICE_UNAVAILABLE)
-                .entity(errMsg)
-                .build();
-          }
-        }
-      }
+  /**
+   * Invoke SubmitApplication to different subClusters.
+   *
+   * @param submissionContext application submission context.
+   * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
+   * @param hsr HttpServletRequest.
+   * @param retryCount number of retries.
+   * @return Get response, If the response is empty or status not equal SC_ACCEPTED,
+   * the request fails, if the response is not empty and status equal SC_OK,
+   * the request is successful.
+   * @throws YarnException yarn exception.
+   * @throws IOException io error.
+   */
+  private Response invokeSubmitApplication(ApplicationSubmissionContextInfo submissionContext,
+      List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+      throws YarnException, IOException, InterruptedException {
+
+    // Step1. We convert ApplicationSubmissionContextInfo to ApplicationSubmissionContext
+    // and Prepare parameters.
+    ApplicationSubmissionContext context =
+        RMWebAppUtil.createAppSubmissionContext(submissionContext, this.getConf());
+    ApplicationId applicationId = ApplicationId.fromString(submissionContext.getApplicationId());
+    SubClusterId subClusterId = null;
 
-      SubClusterInfo subClusterInfo;
-      try {
-        subClusterInfo = federationFacade.getSubCluster(subClusterId);
-      } catch (YarnException e) {
-        routerMetrics.incrAppsFailedSubmitted();
-        return Response
-            .status(Status.SERVICE_UNAVAILABLE)
-            .entity(e.getLocalizedMessage())
-            .build();
-      }
+    try {
+      // Get subClusterId from policy.
+      subClusterId = policyFacade.getHomeSubcluster(context, blackList);
 
-      Response response = null;
-      try {
-        response = getOrCreateInterceptorForSubCluster(subClusterId,
-            subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
-                hsr);
-      } catch (Exception e) {
-        LOG.warn("Unable to submit the application {} to SubCluster {}",
-            applicationId, subClusterId.getId(), e);
-      }
+      // Print the log of submitting the submitApplication.
+      LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
+          applicationId, retryCount, subClusterId);
 
-      if (response != null &&
-          response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
-        LOG.info("Application {} with appId {} submitted on {}",
-            context.getApplicationName(), applicationId, subClusterId);
+      // Step2. We Store the mapping relationship
+      // between Application and HomeSubCluster in stateStore.
+      federationFacade.addOrUpdateApplicationHomeSubCluster(
+          applicationId, subClusterId, retryCount);
 
-        long stopTime = clock.getTime();
-        routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+      // Step3. We get subClusterInfo based on subClusterId.
+      SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
 
+      // Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED,
+      // We return the response, otherwise we throw an exception.
+      Response response = getOrCreateInterceptorForSubCluster(subClusterId,
+          subClusterInfo.getRMWebServiceAddress()).submitApplication(submissionContext, hsr);
+      if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
+        LOG.info("Application {} with appId {} submitted on {}.",
+            context.getApplicationName(), applicationId, subClusterId);
         return response;
-      } else {
-        // Empty response from the ResourceManager.
-        // Blacklist this subcluster for this request.
-        blacklist.add(subClusterId);
       }
+      String msg = String.format("application %s failed to be submitted.", applicationId);
+      throw new YarnException(msg);
+    } catch (Exception e) {
+      LOG.warn("Unable to submit the application {} to SubCluster {}.", applicationId,
+          subClusterId, e);
+      if (subClusterId != null) {
+        blackList.add(subClusterId);
+      }
+      throw e;
     }
-
-    routerMetrics.incrAppsFailedSubmitted();
-    String errMsg = "Application " + newApp.getApplicationName()
-        + " with appId " + applicationId + " failed to be submitted.";
-    LOG.error(errMsg);
-    return Response
-        .status(Status.SERVICE_UNAVAILABLE)
-        .entity(errMsg)
-        .build();
   }
 
   /**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index e598a52f877..e2b2103c7de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -180,6 +180,9 @@ public class TestFederationInterceptorRESTRetry
   @Test
   public void testGetNewApplicationTwoBadSCs()
       throws YarnException, IOException, InterruptedException {
+
+    LOG.info("Test getNewApplication with two bad SCs.");
+
     setupCluster(Arrays.asList(bad1, bad2));
 
     Response response = interceptor.createNewApplication(null);
@@ -195,17 +198,21 @@ public class TestFederationInterceptorRESTRetry
   @Test
   public void testGetNewApplicationOneBadOneGood()
       throws YarnException, IOException, InterruptedException {
-    System.out.println("Test getNewApplication with one bad, one good SC");
+
+    LOG.info("Test getNewApplication with one bad, one good SC.");
+
     setupCluster(Arrays.asList(good, bad2));
     Response response = interceptor.createNewApplication(null);
-
+    Assert.assertNotNull(response);
     Assert.assertEquals(OK, response.getStatus());
 
     NewApplication newApp = (NewApplication) response.getEntity();
+    Assert.assertNotNull(newApp);
+
     ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+    Assert.assertNotNull(appId);
 
-    Assert.assertEquals(Integer.parseInt(good.getId()),
-        appId.getClusterTimestamp());
+    Assert.assertEquals(Integer.parseInt(good.getId()), appId.getClusterTimestamp());
   }
 
   /**
@@ -216,6 +223,8 @@ public class TestFederationInterceptorRESTRetry
   public void testSubmitApplicationOneBadSC()
       throws YarnException, IOException, InterruptedException {
 
+    LOG.info("Test submitApplication with one bad SC.");
+
     setupCluster(Arrays.asList(bad2));
 
     ApplicationId appId =


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org