You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2023/02/22 20:37:59 UTC

[hadoop] branch trunk updated: YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e997d818d0 YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
2e997d818d0 is described below

commit 2e997d818d00ff9ca868afd895648fdaa380922d
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Thu Feb 23 04:37:35 2023 +0800

    YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
---
 .../store/impl/MemoryFederationStateStore.java     | 112 +++++++++++----------
 .../store/impl/FederationStateStoreBaseTest.java   |  19 ++--
 2 files changed, 66 insertions(+), 65 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index b91de3ae808..03fb19a173d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
 import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
 import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -107,7 +108,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta
 public class MemoryFederationStateStore implements FederationStateStore {
 
   private Map<SubClusterId, SubClusterInfo> membership;
-  private Map<ApplicationId, SubClusterId> applications;
+  private Map<ApplicationId, ApplicationHomeSubCluster> applications;
   private Map<ReservationId, SubClusterId> reservations;
   private Map<String, SubClusterPolicyConfiguration> policies;
   private RouterRMDTSecretManagerState routerRMSecretManagerState;
@@ -122,10 +123,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
 
   @Override
   public void init(Configuration conf) {
-    membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
-    applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
-    reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
-    policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
+    membership = new ConcurrentHashMap<>();
+    applications = new ConcurrentHashMap<>();
+    reservations = new ConcurrentHashMap<>();
+    policies = new ConcurrentHashMap<>();
     routerRMSecretManagerState = new RouterRMDTSecretManagerState();
     maxAppsInStateStore = conf.getInt(
         YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
@@ -143,14 +144,15 @@ public class MemoryFederationStateStore implements FederationStateStore {
   }
 
   @Override
-  public SubClusterRegisterResponse registerSubCluster(
-      SubClusterRegisterRequest request) throws YarnException {
+  public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
+      throws YarnException {
+    long startTime = clock.getTime();
+
     FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterInfo subClusterInfo = request.getSubClusterInfo();
 
     long currentTime =
         Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
-
     SubClusterInfo subClusterInfoToSave =
         SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
             subClusterInfo.getAMRMServiceAddress(),
@@ -161,18 +163,21 @@ public class MemoryFederationStateStore implements FederationStateStore {
             subClusterInfo.getCapability());
 
     membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
+    long stopTime = clock.getTime();
+
+    FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
     return SubClusterRegisterResponse.newInstance();
   }
 
   @Override
-  public SubClusterDeregisterResponse deregisterSubCluster(
-      SubClusterDeregisterRequest request) throws YarnException {
+  public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
+      throws YarnException {
+
     FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
     if (subClusterInfo == null) {
-      String errMsg =
-          "SubCluster " + request.getSubClusterId().toString() + " not found";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(
+          LOG, "SubCluster %s not found", request.getSubClusterId());
     } else {
       subClusterInfo.setState(request.getState());
     }
@@ -181,17 +186,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
   }
 
   @Override
-  public SubClusterHeartbeatResponse subClusterHeartbeat(
-      SubClusterHeartbeatRequest request) throws YarnException {
+  public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
+      throws YarnException {
 
     FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterId subClusterId = request.getSubClusterId();
     SubClusterInfo subClusterInfo = membership.get(subClusterId);
 
     if (subClusterInfo == null) {
-      String errMsg = "SubCluster " + subClusterId.toString()
-          + " does not exist; cannot heartbeat";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(
+          LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId());
     }
 
     long currentTime =
@@ -205,11 +209,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
   }
 
   @Override
-  public GetSubClusterInfoResponse getSubCluster(
-      GetSubClusterInfoRequest request) throws YarnException {
+  public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
+      throws YarnException {
 
     FederationMembershipStateStoreInputValidator.validate(request);
     SubClusterId subClusterId = request.getSubClusterId();
+
     if (!membership.containsKey(subClusterId)) {
       LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
       return null;
@@ -219,16 +224,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
   }
 
   @Override
-  public GetSubClustersInfoResponse getSubClusters(
-      GetSubClustersInfoRequest request) throws YarnException {
-    List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
+  public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
+      throws YarnException {
+
+    List<SubClusterInfo> result = new ArrayList<>();
 
     for (SubClusterInfo info : membership.values()) {
-      if (!request.getFilterInactiveSubClusters()
-          || info.getState().isActive()) {
+      if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) {
         result.add(info);
       }
     }
+
     return GetSubClustersInfoResponse.newInstance(result);
   }
 
@@ -239,16 +245,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
       AddApplicationHomeSubClusterRequest request) throws YarnException {
 
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
-    ApplicationId appId =
-        request.getApplicationHomeSubCluster().getApplicationId();
+    ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster();
+
+    ApplicationId appId = homeSubCluster.getApplicationId();
 
     if (!applications.containsKey(appId)) {
-      applications.put(appId,
-          request.getApplicationHomeSubCluster().getHomeSubCluster());
+      applications.put(appId, homeSubCluster);
     }
 
-    return AddApplicationHomeSubClusterResponse
-        .newInstance(applications.get(appId));
+    ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId);
+    return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster());
   }
 
   @Override
@@ -256,15 +262,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
       UpdateApplicationHomeSubClusterRequest request) throws YarnException {
 
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
     ApplicationId appId =
         request.getApplicationHomeSubCluster().getApplicationId();
+
     if (!applications.containsKey(appId)) {
-      String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Application %s does not exist.", appId);
     }
 
-    applications.put(appId,
-        request.getApplicationHomeSubCluster().getHomeSubCluster());
+    applications.put(appId, request.getApplicationHomeSubCluster());
     return UpdateApplicationHomeSubClusterResponse.newInstance();
   }
 
@@ -275,11 +282,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
-      String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Application %s does not exist.", appId);
     }
 
-    return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId));
+    return GetApplicationHomeSubClusterResponse.newInstance(appId,
+        applications.get(appId).getHomeSubCluster());
   }
 
   @Override
@@ -303,7 +311,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
   }
 
   private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
-    SubClusterId subClusterId = applications.get(applicationId);
+    SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster();
     return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
   }
 
@@ -314,8 +322,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
     FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
     ApplicationId appId = request.getApplicationId();
     if (!applications.containsKey(appId)) {
-      String errMsg = "Application " + appId + " does not exist";
-      FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Application %s does not exist.", appId);
     }
 
     applications.remove(appId);
@@ -329,12 +337,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
     FederationPolicyStoreInputValidator.validate(request);
     String queue = request.getQueue();
     if (!policies.containsKey(queue)) {
-      LOG.warn("Policy for queue: {} does not exist.", queue);
+      LOG.warn("Policy for queue : {} does not exist.", queue);
       return null;
     }
 
-    return GetSubClusterPolicyConfigurationResponse
-        .newInstance(policies.get(queue));
+    return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue));
   }
 
   @Override
@@ -350,8 +357,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
   @Override
   public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
       GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
-    ArrayList<SubClusterPolicyConfiguration> result =
-        new ArrayList<SubClusterPolicyConfiguration>();
+    ArrayList<SubClusterPolicyConfiguration> result = new ArrayList<>();
     for (SubClusterPolicyConfiguration policy : policies.values()) {
       result.add(policy);
     }
@@ -386,7 +392,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
     FederationReservationHomeSubClusterStoreInputValidator.validate(request);
     ReservationId reservationId = request.getReservationId();
     if (!reservations.containsKey(reservationId)) {
-      throw new YarnException("Reservation " + reservationId + " does not exist");
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Reservation %s does not exist.", reservationId);
     }
     SubClusterId subClusterId = reservations.get(reservationId);
     ReservationHomeSubCluster homeSubCluster =
@@ -417,7 +424,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
     ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId();
 
     if (!reservations.containsKey(reservationId)) {
-      throw new YarnException("Reservation " + reservationId + " does not exist.");
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Reservation %s does not exist.", reservationId);
     }
 
     SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster();
@@ -431,7 +439,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
     FederationReservationHomeSubClusterStoreInputValidator.validate(request);
     ReservationId reservationId = request.getReservationId();
     if (!reservations.containsKey(reservationId)) {
-      throw new YarnException("Reservation " + reservationId + " does not exist");
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Reservation %s does not exist.", reservationId);
     }
     reservations.remove(reservationId);
     return DeleteReservationHomeSubClusterResponse.newInstance();
@@ -446,9 +455,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
 
     Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
     if (rmDTMasterKeyState.contains(delegationKey)) {
-      LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
-      throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() +
-          " is already stored");
+      FederationStateStoreUtils.logAndThrowStoreException(LOG,
+          "Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId());
     }
 
     routerRMSecretManagerState.getMasterKeyState().add(delegationKey);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index c93115ccfd3..1952d47cb54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -164,12 +164,9 @@ public abstract class FederationStateStoreBaseTest {
 
     SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
         .newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
-    try {
-      stateStore.deregisterSubCluster(deregisterRequest);
-      Assert.fail();
-    } catch (FederationStateStoreException e) {
-      Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
-    }
+
+    LambdaTestUtils.intercept(YarnException.class,
+        "SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest));
   }
 
   @Test
@@ -266,13 +263,9 @@ public abstract class FederationStateStoreBaseTest {
     SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
         .newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
 
-    try {
-      stateStore.subClusterHeartbeat(heartbeatRequest);
-      Assert.fail();
-    } catch (FederationStateStoreException e) {
-      Assert.assertTrue(e.getMessage()
-          .startsWith("SubCluster SC does not exist; cannot heartbeat"));
-    }
+    LambdaTestUtils.intercept(YarnException.class,
+        "SubCluster SC does not exist; cannot heartbeat",
+        () -> stateStore.subClusterHeartbeat(heartbeatRequest));
   }
 
   // Test FederationApplicationHomeSubClusterStore


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org