You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2022/11/22 22:38:46 UTC
[hadoop] branch trunk updated: YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7cb22eb72d5 YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)
7cb22eb72d5 is described below
commit 7cb22eb72d587d4b721c3e072bb2811c767db9ab
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Wed Nov 23 06:38:24 2022 +0800
YARN-11371. [Federation] Refactor FederationInterceptorREST#createNewApplication\submitApplication Use FederationActionRetry. (#5130)
---
.../utils/FederationStateStoreFacade.java | 170 ++++++++++-
.../yarn/server/router/RouterServerUtil.java | 45 ---
.../clientrm/FederationClientInterceptor.java | 96 +-----
.../router/webapp/FederationInterceptorREST.java | 324 +++++++++------------
.../webapp/TestFederationInterceptorRESTRetry.java | 17 +-
5 files changed, 334 insertions(+), 318 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
index 47cb9e9e35c..fc1e442ab9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/utils/FederationStateStoreFacade.java
@@ -22,8 +22,10 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
+import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.Random;
import javax.cache.Cache;
import javax.cache.CacheManager;
@@ -38,6 +40,8 @@ import javax.cache.integration.CacheLoader;
import javax.cache.integration.CacheLoaderException;
import javax.cache.spi.CachingProvider;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.RetryPolicies;
@@ -50,6 +54,8 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateStoreRetriableException;
@@ -110,6 +116,8 @@ public final class FederationStateStoreFacade {
private static final FederationStateStoreFacade FACADE =
new FederationStateStoreFacade();
+ private static Random rand = new Random(System.currentTimeMillis());
+
private FederationStateStore stateStore;
private int cacheTimeToLive;
private Configuration conf;
@@ -496,6 +504,7 @@ public final class FederationStateStoreFacade {
* @param defaultValue the default implementation for fallback
* @param type the class for which a retry proxy is required
* @param retryPolicy the policy for retrying method call failures
+ * @param <T> The type of the instance
* @return a retry proxy for the specified interface
*/
public static <T> Object createRetryInstance(Configuration conf,
@@ -731,7 +740,7 @@ public final class FederationStateStoreFacade {
return stateStore;
}
- /*
+ /**
* The Router Supports Store NewMasterKey (RouterMasterKey{@link RouterMasterKey}).
*
* @param newKey Key used for generating and verifying delegation tokens
@@ -849,4 +858,163 @@ public final class FederationStateStoreFacade {
RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken);
return stateStore.getTokenByRouterStoreToken(request);
}
+
+ /**
+ * Get the number of active cluster nodes.
+ *
+ * @return number of active cluster nodes.
+ * @throws YarnException if the call to the state store is unsuccessful.
+ */
+ public int getActiveSubClustersCount() throws YarnException {
+ Map<SubClusterId, SubClusterInfo> activeSubClusters = getSubClusters(true);
+ if (activeSubClusters == null || activeSubClusters.isEmpty()) {
+ return 0;
+ } else {
+ return activeSubClusters.size();
+ }
+ }
+
+ /**
+ * Randomly pick ActiveSubCluster.
+ * During the selection process, we will exclude SubClusters from the blacklist.
+ *
+ * @param activeSubClusters List of active subClusters.
+ * @param blackList blacklist.
+ * @return Active SubClusterId.
+ * @throws YarnException When there is no Active SubCluster,
+ * an exception will be thrown (No active SubCluster available to submit the request.)
+ */
+ public static SubClusterId getRandomActiveSubCluster(
+ Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
+ throws YarnException {
+
+ // Check if activeSubClusters is empty, if it is empty, we need to throw an exception
+ if (MapUtils.isEmpty(activeSubClusters)) {
+ throw new FederationPolicyException(
+ FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+ }
+
+ // Change activeSubClusters to List
+ List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
+
+ // If the blacklist is not empty, we need to remove all the subClusters in the blacklist
+ if (CollectionUtils.isNotEmpty(blackList)) {
+ subClusterIds.removeAll(blackList);
+ }
+
+ // Check there are still active subcluster after removing the blacklist
+ if (CollectionUtils.isEmpty(subClusterIds)) {
+ throw new FederationPolicyException(
+ FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE);
+ }
+
+ // Randomly choose a SubCluster
+ return subClusterIds.get(rand.nextInt(subClusterIds.size()));
+ }
+
+ /**
+ * Get the number of retries.
+ *
+ * @param configRetries User-configured number of retries.
+ * @return number of retries.
+ * @throws YarnException yarn exception.
+ */
+ public int getRetryNumbers(int configRetries) throws YarnException {
+ int activeSubClustersCount = getActiveSubClustersCount();
+ int actualRetryNums = Math.min(activeSubClustersCount, configRetries);
+ // Normally, we don't set a negative number for the number of retries,
+ // but if the user sets a negative number for the number of retries,
+ // we will return 0
+ if (actualRetryNums < 0) {
+ return 0;
+ }
+ return actualRetryNums;
+ }
+
+ /**
+ * Query SubClusterId By applicationId.
+ *
+ * If SubClusterId is not empty, it means it exists and returns true;
+ * if SubClusterId is empty, it means it does not exist and returns false.
+ *
+ * @param applicationId applicationId
+ * @return true, SubClusterId exists; false, SubClusterId not exists.
+ */
+ public boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
+ try {
+ SubClusterId subClusterId = getApplicationHomeSubCluster(applicationId);
+ if (subClusterId != null) {
+ return true;
+ }
+ } catch (YarnException e) {
+ LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
+ }
+ return false;
+ }
+
+ /**
+ * Add ApplicationHomeSubCluster to FederationStateStore.
+ *
+ * @param applicationId applicationId.
+ * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
+ * @throws YarnException yarn exception.
+ */
+ public void addApplicationHomeSubCluster(ApplicationId applicationId,
+ ApplicationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ addApplicationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ String msg = String.format(
+ "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
+ throw new YarnException(msg, e);
+ }
+ }
+
+ /**
+ * Update ApplicationHomeSubCluster to FederationStateStore.
+ *
+ * @param subClusterId homeSubClusterId
+ * @param applicationId applicationId.
+ * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
+ * @throws YarnException yarn exception.
+ */
+ public void updateApplicationHomeSubCluster(SubClusterId subClusterId,
+ ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
+ try {
+ updateApplicationHomeSubCluster(homeSubCluster);
+ } catch (YarnException e) {
+ SubClusterId subClusterIdInStateStore = getApplicationHomeSubCluster(applicationId);
+ if (subClusterId == subClusterIdInStateStore) {
+ LOG.info("Application {} already submitted on SubCluster {}.", applicationId, subClusterId);
+ } else {
+ String msg = String.format(
+ "Unable to update the ApplicationId %s into the FederationStateStore.", applicationId);
+ throw new YarnException(msg, e);
+ }
+ }
+ }
+
+ /**
+ * Add or Update ApplicationHomeSubCluster.
+ *
+ * @param applicationId applicationId, is the id of the application.
+ * @param subClusterId homeSubClusterId, this is selected by strategy.
+ * @param retryCount number of retries.
+ * @throws YarnException yarn exception.
+ */
+ public void addOrUpdateApplicationHomeSubCluster(ApplicationId applicationId,
+ SubClusterId subClusterId, int retryCount) throws YarnException {
+ Boolean exists = existsApplicationHomeSubCluster(applicationId);
+ ApplicationHomeSubCluster appHomeSubCluster =
+ ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+ if (!exists || retryCount == 0) {
+ // persist the mapping of applicationId and the subClusterId which has
+ // been selected as its home.
+ addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
+ } else {
+ // update the mapping of applicationId and the home subClusterId to
+ // the new subClusterId we have selected.
+ updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
index 6dd49daa4ed..93818229dd1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/RouterServerUtil.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.yarn.server.router;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -30,9 +28,6 @@ import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,8 +36,6 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.Random;
import java.io.IOException;
/**
@@ -61,8 +54,6 @@ public final class RouterServerUtil {
private static final String EPOCH_PREFIX = "e";
- private static Random rand = new Random(System.currentTimeMillis());
-
/** Disable constructor. */
private RouterServerUtil() {
}
@@ -479,42 +470,6 @@ public final class RouterServerUtil {
}
}
- /**
- * Randomly pick ActiveSubCluster.
- * During the selection process, we will exclude SubClusters from the blacklist.
- *
- * @param activeSubClusters List of active subClusters.
- * @param blackList blacklist.
- * @return Active SubClusterId.
- * @throws YarnException When there is no Active SubCluster,
- * an exception will be thrown (No active SubCluster available to submit the request.)
- */
- public static SubClusterId getRandomActiveSubCluster(
- Map<SubClusterId, SubClusterInfo> activeSubClusters, List<SubClusterId> blackList)
- throws YarnException {
-
- // Check if activeSubClusters is empty, if it is empty, we need to throw an exception
- if (MapUtils.isEmpty(activeSubClusters)) {
- logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
- }
-
- // Change activeSubClusters to List
- List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
-
- // If the blacklist is not empty, we need to remove all the subClusters in the blacklist
- if (CollectionUtils.isNotEmpty(blackList)) {
- subClusterIds.removeAll(blackList);
- }
-
- // Check there are still active subcluster after removing the blacklist
- if (CollectionUtils.isEmpty(subClusterIds)) {
- logAndThrowException(FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
- }
-
- // Randomly choose a SubCluster
- return subClusterIds.get(rand.nextInt(subClusterIds.size()));
- }
-
public static UserGroupInformation setupUser(final String userName) {
UserGroupInformation user = null;
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
index 9bfcb04ff8a..cf457c70771 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/clientrm/FederationClientInterceptor.java
@@ -272,17 +272,6 @@ public class FederationClientInterceptor
return list.get(rand.nextInt(list.size()));
}
- @VisibleForTesting
- private int getActiveSubClustersCount() throws YarnException {
- Map<SubClusterId, SubClusterInfo> activeSubClusters =
- federationFacade.getSubClusters(true);
- if (activeSubClusters == null || activeSubClusters.isEmpty()) {
- return 0;
- } else {
- return activeSubClusters.size();
- }
- }
-
/**
* YARN Router forwards every getNewApplication requests to any RM. During
* this operation there will be no communication with the State Store. The
@@ -318,7 +307,7 @@ public class FederationClientInterceptor
// Try calling the getNewApplication method
List<SubClusterId> blacklist = new ArrayList<>();
- int activeSubClustersCount = getActiveSubClustersCount();
+ int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
try {
@@ -361,7 +350,7 @@ public class FederationClientInterceptor
List<SubClusterId> blackList, GetNewApplicationRequest request, int retryCount)
throws YarnException, IOException {
SubClusterId subClusterId =
- RouterServerUtil.getRandomActiveSubCluster(subClustersActive, blackList);
+ federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
try {
@@ -474,7 +463,7 @@ public class FederationClientInterceptor
// the user will provide us with an expected submitRetries,
// but if the number of Active SubClusters is less than this number at this time,
// we should provide a high number of retry according to the number of Active SubClusters.
- int activeSubClustersCount = getActiveSubClustersCount();
+ int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
// Try calling the SubmitApplication method
@@ -542,17 +531,10 @@ public class FederationClientInterceptor
LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
applicationId, retryCount, subClusterId);
- // Step2. Query homeSubCluster according to ApplicationId.
- Boolean exists = existsApplicationHomeSubCluster(applicationId);
-
- ApplicationHomeSubCluster appHomeSubCluster =
- ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
-
- if (!exists || retryCount == 0) {
- addApplicationHomeSubCluster(applicationId, appHomeSubCluster);
- } else {
- updateApplicationHomeSubCluster(subClusterId, applicationId, appHomeSubCluster);
- }
+ // Step2. We Store the mapping relationship
+ // between Application and HomeSubCluster in stateStore.
+ federationFacade.addOrUpdateApplicationHomeSubCluster(
+ applicationId, subClusterId, retryCount);
// Step3. SubmitApplication to the subCluster
ApplicationClientProtocol clientRMProxy = getClientRMProxyForSubCluster(subClusterId);
@@ -581,70 +563,6 @@ public class FederationClientInterceptor
throw new YarnException(msg);
}
- /**
- * Add ApplicationHomeSubCluster to FederationStateStore.
- *
- * @param applicationId applicationId.
- * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
- * @throws YarnException yarn exception.
- */
- private void addApplicationHomeSubCluster(ApplicationId applicationId,
- ApplicationHomeSubCluster homeSubCluster) throws YarnException {
- try {
- federationFacade.addApplicationHomeSubCluster(homeSubCluster);
- } catch (YarnException e) {
- RouterServerUtil.logAndThrowException(e,
- "Unable to insert the ApplicationId %s into the FederationStateStore.", applicationId);
- }
- }
-
- /**
- * Update ApplicationHomeSubCluster to FederationStateStore.
- *
- * @param subClusterId homeSubClusterId
- * @param applicationId applicationId.
- * @param homeSubCluster homeSubCluster, homeSubCluster selected according to policy.
- * @throws YarnException yarn exception.
- */
- private void updateApplicationHomeSubCluster(SubClusterId subClusterId,
- ApplicationId applicationId, ApplicationHomeSubCluster homeSubCluster) throws YarnException {
- try {
- federationFacade.updateApplicationHomeSubCluster(homeSubCluster);
- } catch (YarnException e) {
- SubClusterId subClusterIdInStateStore =
- federationFacade.getApplicationHomeSubCluster(applicationId);
- if (subClusterId == subClusterIdInStateStore) {
- LOG.info("Application {} already submitted on SubCluster {}.",
- applicationId, subClusterId);
- } else {
- RouterServerUtil.logAndThrowException(e,
- "Unable to update the ApplicationId %s into the FederationStateStore.",
- applicationId);
- }
- }
- }
-
- /**
- * Query SubClusterId By applicationId.
- *
- * If SubClusterId is not empty, it means it exists and returns true;
- * if SubClusterId is empty, it means it does not exist and returns false.
- *
- * @param applicationId applicationId
- * @return true, SubClusterId exists; false, SubClusterId not exists.
- */
- private boolean existsApplicationHomeSubCluster(ApplicationId applicationId) {
- try {
- SubClusterId subClusterId = federationFacade.getApplicationHomeSubCluster(applicationId);
- if (subClusterId != null) {
- return true;
- }
- } catch (YarnException e) {
- LOG.warn("get homeSubCluster by applicationId = {} error.", applicationId, e);
- }
- return false;
- }
-
/**
* The YARN Router will forward to the respective YARN RM in which the AM is
* running.
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
index dc3508e0dfa..a21be7b4e1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/main/java/org/apache/hadoop/yarn/server/router/webapp/FederationInterceptorREST.java
@@ -27,14 +27,13 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Random;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
@@ -62,9 +61,10 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.RouterPolicyFacade;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
-import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.retry.FederationActionRetry;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -134,13 +134,13 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
private int numSubmitRetries;
private FederationStateStoreFacade federationFacade;
- private Random rand;
private RouterPolicyFacade policyFacade;
private RouterMetrics routerMetrics;
private final Clock clock = new MonotonicClock();
private boolean returnPartialReport;
private boolean appInfosCacheEnabled;
private int appInfosCacheCount;
+ private long submitIntervalTime;
private Map<SubClusterId, DefaultRequestInterceptorREST> interceptors;
private LRUCacheHashMap<RouterAppInfoCacheKey, AppsInfo> appInfosCaches;
@@ -156,7 +156,6 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
super.init(user);
federationFacade = FederationStateStoreFacade.getInstance();
- rand = new Random();
final Configuration conf = this.getConf();
@@ -194,24 +193,10 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
YarnConfiguration.DEFAULT_ROUTER_APPSINFO_CACHED_COUNT);
appInfosCaches = new LRUCacheHashMap<>(appInfosCacheCount, true);
}
- }
-
- private SubClusterId getRandomActiveSubCluster(
- Map<SubClusterId, SubClusterInfo> activeSubclusters,
- List<SubClusterId> blackListSubClusters) throws YarnException {
-
- if (activeSubclusters == null || activeSubclusters.size() < 1) {
- RouterServerUtil.logAndThrowException(
- FederationPolicyUtils.NO_ACTIVE_SUBCLUSTER_AVAILABLE, null);
- }
- Collection<SubClusterId> keySet = activeSubclusters.keySet();
- FederationPolicyUtils.validateSubClusterAvailability(keySet, blackListSubClusters);
- if (blackListSubClusters != null) {
- keySet.removeAll(blackListSubClusters);
- }
- List<SubClusterId> list = keySet.stream().collect(Collectors.toList());
- return list.get(rand.nextInt(list.size()));
+ submitIntervalTime = conf.getTimeDuration(
+ YarnConfiguration.ROUTER_CLIENTRM_SUBMIT_INTERVAL_TIME,
+ YarnConfiguration.DEFAULT_CLIENTRM_SUBMIT_INTERVAL_TIME, TimeUnit.MILLISECONDS);
}
@VisibleForTesting
@@ -301,62 +286,79 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
long startTime = clock.getTime();
- Map<SubClusterId, SubClusterInfo> subClustersActive;
try {
- subClustersActive = federationFacade.getSubClusters(true);
- } catch (YarnException e) {
+ Map<SubClusterId, SubClusterInfo> subClustersActive =
+ federationFacade.getSubClusters(true);
+
+ // We declare blackList and retries.
+ List<SubClusterId> blackList = new ArrayList<>();
+ int actualRetryNums = federationFacade.getRetryNumbers(numSubmitRetries);
+ Response response = ((FederationActionRetry<Response>) (retryCount) ->
+ invokeGetNewApplication(subClustersActive, blackList, hsr, retryCount)).
+ runWithRetries(actualRetryNums, submitIntervalTime);
+
+ // If the response is not empty and the status is SC_OK,
+ // this request can be returned directly.
+ if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsCreated(stopTime - startTime);
+ return response;
+ }
+ } catch (FederationPolicyException e) {
+ // If a FederationPolicyException is thrown, the service is unavailable.
routerMetrics.incrAppsFailedCreated();
- return Response.status(Status.INTERNAL_SERVER_ERROR)
- .entity(e.getLocalizedMessage()).build();
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+ } catch (Exception e) {
+ routerMetrics.incrAppsFailedCreated();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(e.getLocalizedMessage()).build();
}
- List<SubClusterId> blacklist = new ArrayList<>();
-
- for (int i = 0; i < numSubmitRetries; ++i) {
-
- SubClusterId subClusterId;
- try {
- subClusterId = getRandomActiveSubCluster(subClustersActive, blacklist);
- } catch (YarnException e) {
- routerMetrics.incrAppsFailedCreated();
- return Response.status(Status.SERVICE_UNAVAILABLE)
- .entity(e.getLocalizedMessage()).build();
- }
+ // return error message directly.
+ String errMsg = "Fail to create a new application.";
+ LOG.error(errMsg);
+ routerMetrics.incrAppsFailedCreated();
+ return Response.status(Status.INTERNAL_SERVER_ERROR).entity(errMsg).build();
+ }
- LOG.debug("getNewApplication try #{} on SubCluster {}.", i, subClusterId);
+ /**
+ * Invoke GetNewApplication to different subClusters.
+ *
+ * @param subClustersActive Active SubClusters.
+ * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
+ * @param hsr HttpServletRequest.
+ * @param retryCount number of retries.
+ * @return Get response, If the response is empty or status not equal SC_OK, the request fails,
+ * if the response is not empty and status equal SC_OK, the request is successful.
+ * @throws YarnException yarn exception.
+ * @throws IOException io error.
+ * @throws InterruptedException interrupted exception.
+ */
+ private Response invokeGetNewApplication(Map<SubClusterId, SubClusterInfo> subClustersActive,
+ List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+ throws YarnException, IOException, InterruptedException {
- DefaultRequestInterceptorREST interceptor =
- getOrCreateInterceptorForSubCluster(subClusterId,
- subClustersActive.get(subClusterId).getRMWebServiceAddress());
- Response response = null;
- try {
- response = interceptor.createNewApplication(hsr);
- } catch (Exception e) {
- LOG.warn("Unable to create a new ApplicationId in SubCluster {}.",
- subClusterId.getId(), e);
- }
+ SubClusterId subClusterId =
+ federationFacade.getRandomActiveSubCluster(subClustersActive, blackList);
- if (response != null &&
- response.getStatus() == HttpServletResponse.SC_OK) {
+ LOG.info("getNewApplication try #{} on SubCluster {}.", retryCount, subClusterId);
- long stopTime = clock.getTime();
- routerMetrics.succeededAppsCreated(stopTime - startTime);
+ DefaultRequestInterceptorREST interceptor = getOrCreateInterceptorForSubCluster(subClusterId,
+ subClustersActive.get(subClusterId).getRMWebServiceAddress());
+ try {
+ Response response = interceptor.createNewApplication(hsr);
+ if (response != null && response.getStatus() == HttpServletResponse.SC_OK) {
return response;
- } else {
- // Empty response from the ResourceManager.
- // Blacklist this subcluster for this request.
- blacklist.add(subClusterId);
}
+ } catch (Exception e) {
+ blackList.add(subClusterId);
+ RouterServerUtil.logAndThrowException(e.getMessage(), e);
}
- String errMsg = "Fail to create a new application.";
- LOG.error(errMsg);
- routerMetrics.incrAppsFailedCreated();
- return Response
- .status(Status.INTERNAL_SERVER_ERROR)
- .entity(errMsg)
- .build();
+ // We need to throw the exception directly.
+ String msg = String.format("Unable to create a new ApplicationId in SubCluster %s.",
+ subClusterId.getId());
+ throw new YarnException(msg);
}
/**
@@ -431,142 +433,106 @@ public class FederationInterceptorREST extends AbstractRESTRequestInterceptor {
long startTime = clock.getTime();
+ // We verify the parameters to ensure that newApp is not empty and
+ // that the format of applicationId is correct.
if (newApp == null || newApp.getApplicationId() == null) {
routerMetrics.incrAppsFailedSubmitted();
String errMsg = "Missing ApplicationSubmissionContextInfo or "
+ "applicationSubmissionContext information.";
- return Response
- .status(Status.BAD_REQUEST)
- .entity(errMsg)
- .build();
+ return Response.status(Status.BAD_REQUEST).entity(errMsg).build();
}
- ApplicationId applicationId = null;
try {
- applicationId = ApplicationId.fromString(newApp.getApplicationId());
+ String applicationId = newApp.getApplicationId();
+ RouterServerUtil.validateApplicationId(applicationId);
} catch (IllegalArgumentException e) {
routerMetrics.incrAppsFailedSubmitted();
- return Response
- .status(Status.BAD_REQUEST)
- .entity(e.getLocalizedMessage())
- .build();
+ return Response.status(Status.BAD_REQUEST).entity(e.getLocalizedMessage()).build();
}
- List<SubClusterId> blacklist = new ArrayList<>();
-
- for (int i = 0; i < numSubmitRetries; ++i) {
-
- ApplicationSubmissionContext context =
- RMWebAppUtil.createAppSubmissionContext(newApp, this.getConf());
-
- SubClusterId subClusterId = null;
- try {
- subClusterId = policyFacade.getHomeSubcluster(context, blacklist);
- } catch (YarnException e) {
- routerMetrics.incrAppsFailedSubmitted();
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(e.getLocalizedMessage())
- .build();
+ List<SubClusterId> blackList = new ArrayList<>();
+ try {
+ int activeSubClustersCount = federationFacade.getActiveSubClustersCount();
+ int actualRetryNums = Math.min(activeSubClustersCount, numSubmitRetries);
+ Response response = ((FederationActionRetry<Response>) (retryCount) ->
+ invokeSubmitApplication(newApp, blackList, hsr, retryCount)).
+ runWithRetries(actualRetryNums, submitIntervalTime);
+ if (response != null) {
+ long stopTime = clock.getTime();
+ routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+ return response;
}
- LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
- applicationId, i, subClusterId);
+ } catch (Exception e) {
+ routerMetrics.incrAppsFailedSubmitted();
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(e.getLocalizedMessage()).build();
+ }
- ApplicationHomeSubCluster appHomeSubCluster =
- ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
+ routerMetrics.incrAppsFailedSubmitted();
+ String errMsg = String.format("Application %s with appId %s failed to be submitted.",
+ newApp.getApplicationName(), newApp.getApplicationId());
+ LOG.error(errMsg);
+ return Response.status(Status.SERVICE_UNAVAILABLE).entity(errMsg).build();
+ }
- if (i == 0) {
- try {
- // persist the mapping of applicationId and the subClusterId which has
- // been selected as its home
- subClusterId =
- federationFacade.addApplicationHomeSubCluster(appHomeSubCluster);
- } catch (YarnException e) {
- routerMetrics.incrAppsFailedSubmitted();
- String errMsg = "Unable to insert the ApplicationId " + applicationId
- + " into the FederationStateStore";
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(errMsg + " " + e.getLocalizedMessage())
- .build();
- }
- } else {
- try {
- // update the mapping of applicationId and the home subClusterId to
- // the new subClusterId we have selected
- federationFacade.updateApplicationHomeSubCluster(appHomeSubCluster);
- } catch (YarnException e) {
- String errMsg = "Unable to update the ApplicationId " + applicationId
- + " into the FederationStateStore";
- SubClusterId subClusterIdInStateStore;
- try {
- subClusterIdInStateStore =
- federationFacade.getApplicationHomeSubCluster(applicationId);
- } catch (YarnException e1) {
- routerMetrics.incrAppsFailedSubmitted();
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(e1.getLocalizedMessage())
- .build();
- }
- if (subClusterId == subClusterIdInStateStore) {
- LOG.info("Application {} already submitted on SubCluster {}.",
- applicationId, subClusterId);
- } else {
- routerMetrics.incrAppsFailedSubmitted();
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(errMsg)
- .build();
- }
- }
- }
+ /**
+ * Invoke SubmitApplication to different subClusters.
+ *
+ * @param submissionContext application submission context.
+ * @param blackList Blacklist avoid repeated calls to unavailable subCluster.
+ * @param hsr HttpServletRequest.
+ * @param retryCount number of retries.
+ * @return Get response, If the response is empty or status not equal SC_ACCEPTED,
+ * the request fails, if the response is not empty and status equal SC_OK,
+ * the request is successful.
+ * @throws YarnException yarn exception.
+ * @throws IOException io error.
+ */
+ private Response invokeSubmitApplication(ApplicationSubmissionContextInfo submissionContext,
+ List<SubClusterId> blackList, HttpServletRequest hsr, int retryCount)
+ throws YarnException, IOException, InterruptedException {
+
+ // Step1. We convert ApplicationSubmissionContextInfo to ApplicationSubmissionContext
+ // and Prepare parameters.
+ ApplicationSubmissionContext context =
+ RMWebAppUtil.createAppSubmissionContext(submissionContext, this.getConf());
+ ApplicationId applicationId = ApplicationId.fromString(submissionContext.getApplicationId());
+ SubClusterId subClusterId = null;
- SubClusterInfo subClusterInfo;
- try {
- subClusterInfo = federationFacade.getSubCluster(subClusterId);
- } catch (YarnException e) {
- routerMetrics.incrAppsFailedSubmitted();
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(e.getLocalizedMessage())
- .build();
- }
+ try {
+ // Get subClusterId from policy.
+ subClusterId = policyFacade.getHomeSubcluster(context, blackList);
- Response response = null;
- try {
- response = getOrCreateInterceptorForSubCluster(subClusterId,
- subClusterInfo.getRMWebServiceAddress()).submitApplication(newApp,
- hsr);
- } catch (Exception e) {
- LOG.warn("Unable to submit the application {} to SubCluster {}",
- applicationId, subClusterId.getId(), e);
- }
+ // Print the log of submitting the submitApplication.
+ LOG.info("submitApplication appId {} try #{} on SubCluster {}.",
+ applicationId, retryCount, subClusterId);
- if (response != null &&
- response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
- LOG.info("Application {} with appId {} submitted on {}",
- context.getApplicationName(), applicationId, subClusterId);
+ // Step2. We Store the mapping relationship
+ // between Application and HomeSubCluster in stateStore.
+ federationFacade.addOrUpdateApplicationHomeSubCluster(
+ applicationId, subClusterId, retryCount);
- long stopTime = clock.getTime();
- routerMetrics.succeededAppsSubmitted(stopTime - startTime);
+ // Step3. We get subClusterInfo based on subClusterId.
+ SubClusterInfo subClusterInfo = federationFacade.getSubCluster(subClusterId);
+ // Step4. Submit the request, if the response is HttpServletResponse.SC_ACCEPTED,
+ // We return the response, otherwise we throw an exception.
+ Response response = getOrCreateInterceptorForSubCluster(subClusterId,
+ subClusterInfo.getRMWebServiceAddress()).submitApplication(submissionContext, hsr);
+ if (response != null && response.getStatus() == HttpServletResponse.SC_ACCEPTED) {
+ LOG.info("Application {} with appId {} submitted on {}.",
+ context.getApplicationName(), applicationId, subClusterId);
return response;
- } else {
- // Empty response from the ResourceManager.
- // Blacklist this subcluster for this request.
- blacklist.add(subClusterId);
}
+ String msg = String.format("application %s failed to be submitted.", applicationId);
+ throw new YarnException(msg);
+ } catch (Exception e) {
+ LOG.warn("Unable to submit the application {} to SubCluster {}.", applicationId,
+ subClusterId, e);
+ if (subClusterId != null) {
+ blackList.add(subClusterId);
+ }
+ throw e;
}
-
- routerMetrics.incrAppsFailedSubmitted();
- String errMsg = "Application " + newApp.getApplicationName()
- + " with appId " + applicationId + " failed to be submitted.";
- LOG.error(errMsg);
- return Response
- .status(Status.SERVICE_UNAVAILABLE)
- .entity(errMsg)
- .build();
}
/**
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
index e598a52f877..e2b2103c7de 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-router/src/test/java/org/apache/hadoop/yarn/server/router/webapp/TestFederationInterceptorRESTRetry.java
@@ -180,6 +180,9 @@ public class TestFederationInterceptorRESTRetry
@Test
public void testGetNewApplicationTwoBadSCs()
throws YarnException, IOException, InterruptedException {
+
+ LOG.info("Test getNewApplication with two bad SCs.");
+
setupCluster(Arrays.asList(bad1, bad2));
Response response = interceptor.createNewApplication(null);
@@ -195,17 +198,21 @@ public class TestFederationInterceptorRESTRetry
@Test
public void testGetNewApplicationOneBadOneGood()
throws YarnException, IOException, InterruptedException {
- System.out.println("Test getNewApplication with one bad, one good SC");
+
+ LOG.info("Test getNewApplication with one bad, one good SC.");
+
setupCluster(Arrays.asList(good, bad2));
Response response = interceptor.createNewApplication(null);
-
+ Assert.assertNotNull(response);
Assert.assertEquals(OK, response.getStatus());
NewApplication newApp = (NewApplication) response.getEntity();
+ Assert.assertNotNull(newApp);
+
ApplicationId appId = ApplicationId.fromString(newApp.getApplicationId());
+ Assert.assertNotNull(appId);
- Assert.assertEquals(Integer.parseInt(good.getId()),
- appId.getClusterTimestamp());
+ Assert.assertEquals(Integer.parseInt(good.getId()), appId.getClusterTimestamp());
}
/**
@@ -216,6 +223,8 @@ public class TestFederationInterceptorRESTRetry
public void testSubmitApplicationOneBadSC()
throws YarnException, IOException, InterruptedException {
+ LOG.info("Test submitApplication with one bad SC.");
+
setupCluster(Arrays.asList(bad2));
ApplicationId appId =
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org