You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2023/02/22 20:37:59 UTC
[hadoop] branch trunk updated: YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e997d818d0 YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
2e997d818d0 is described below
commit 2e997d818d00ff9ca868afd895648fdaa380922d
Author: slfan1989 <55...@users.noreply.github.com>
AuthorDate: Thu Feb 23 04:37:35 2023 +0800
YARN-11370. [Federation] Refactor MemoryFederationStateStore code. (#5126)
---
.../store/impl/MemoryFederationStateStore.java | 112 +++++++++++----------
.../store/impl/FederationStateStoreBaseTest.java | 19 ++--
2 files changed, 66 insertions(+), 65 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index b91de3ae808..03fb19a173d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.metrics.FederationStateStoreClientMetrics;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
@@ -107,7 +108,7 @@ import static org.apache.hadoop.yarn.server.federation.store.utils.FederationSta
public class MemoryFederationStateStore implements FederationStateStore {
private Map<SubClusterId, SubClusterInfo> membership;
- private Map<ApplicationId, SubClusterId> applications;
+ private Map<ApplicationId, ApplicationHomeSubCluster> applications;
private Map<ReservationId, SubClusterId> reservations;
private Map<String, SubClusterPolicyConfiguration> policies;
private RouterRMDTSecretManagerState routerRMSecretManagerState;
@@ -122,10 +123,10 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public void init(Configuration conf) {
- membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
- applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
- reservations = new ConcurrentHashMap<ReservationId, SubClusterId>();
- policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
+ membership = new ConcurrentHashMap<>();
+ applications = new ConcurrentHashMap<>();
+ reservations = new ConcurrentHashMap<>();
+ policies = new ConcurrentHashMap<>();
routerRMSecretManagerState = new RouterRMDTSecretManagerState();
maxAppsInStateStore = conf.getInt(
YarnConfiguration.FEDERATION_STATESTORE_MAX_APPLICATIONS,
@@ -143,14 +144,15 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
- public SubClusterRegisterResponse registerSubCluster(
- SubClusterRegisterRequest request) throws YarnException {
+ public SubClusterRegisterResponse registerSubCluster(SubClusterRegisterRequest request)
+ throws YarnException {
+ long startTime = clock.getTime();
+
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
long currentTime =
Calendar.getInstance(TimeZone.getTimeZone("UTC")).getTimeInMillis();
-
SubClusterInfo subClusterInfoToSave =
SubClusterInfo.newInstance(subClusterInfo.getSubClusterId(),
subClusterInfo.getAMRMServiceAddress(),
@@ -161,18 +163,21 @@ public class MemoryFederationStateStore implements FederationStateStore {
subClusterInfo.getCapability());
membership.put(subClusterInfo.getSubClusterId(), subClusterInfoToSave);
+ long stopTime = clock.getTime();
+
+ FederationStateStoreClientMetrics.succeededStateStoreCall(stopTime - startTime);
return SubClusterRegisterResponse.newInstance();
}
@Override
- public SubClusterDeregisterResponse deregisterSubCluster(
- SubClusterDeregisterRequest request) throws YarnException {
+ public SubClusterDeregisterResponse deregisterSubCluster(SubClusterDeregisterRequest request)
+ throws YarnException {
+
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterInfo subClusterInfo = membership.get(request.getSubClusterId());
if (subClusterInfo == null) {
- String errMsg =
- "SubCluster " + request.getSubClusterId().toString() + " not found";
- FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(
+ LOG, "SubCluster %s not found", request.getSubClusterId());
} else {
subClusterInfo.setState(request.getState());
}
@@ -181,17 +186,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
- public SubClusterHeartbeatResponse subClusterHeartbeat(
- SubClusterHeartbeatRequest request) throws YarnException {
+ public SubClusterHeartbeatResponse subClusterHeartbeat(SubClusterHeartbeatRequest request)
+ throws YarnException {
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
SubClusterInfo subClusterInfo = membership.get(subClusterId);
if (subClusterInfo == null) {
- String errMsg = "SubCluster " + subClusterId.toString()
- + " does not exist; cannot heartbeat";
- FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(
+ LOG, "SubCluster %s does not exist; cannot heartbeat.", request.getSubClusterId());
}
long currentTime =
@@ -205,11 +209,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
- public GetSubClusterInfoResponse getSubCluster(
- GetSubClusterInfoRequest request) throws YarnException {
+ public GetSubClusterInfoResponse getSubCluster(GetSubClusterInfoRequest request)
+ throws YarnException {
FederationMembershipStateStoreInputValidator.validate(request);
SubClusterId subClusterId = request.getSubClusterId();
+
if (!membership.containsKey(subClusterId)) {
LOG.warn("The queried SubCluster: {} does not exist.", subClusterId);
return null;
@@ -219,16 +224,17 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
@Override
- public GetSubClustersInfoResponse getSubClusters(
- GetSubClustersInfoRequest request) throws YarnException {
- List<SubClusterInfo> result = new ArrayList<SubClusterInfo>();
+ public GetSubClustersInfoResponse getSubClusters(GetSubClustersInfoRequest request)
+ throws YarnException {
+
+ List<SubClusterInfo> result = new ArrayList<>();
for (SubClusterInfo info : membership.values()) {
- if (!request.getFilterInactiveSubClusters()
- || info.getState().isActive()) {
+ if (!request.getFilterInactiveSubClusters() || info.getState().isActive()) {
result.add(info);
}
}
+
return GetSubClustersInfoResponse.newInstance(result);
}
@@ -239,16 +245,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
AddApplicationHomeSubClusterRequest request) throws YarnException {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
- ApplicationId appId =
- request.getApplicationHomeSubCluster().getApplicationId();
+ ApplicationHomeSubCluster homeSubCluster = request.getApplicationHomeSubCluster();
+
+ ApplicationId appId = homeSubCluster.getApplicationId();
if (!applications.containsKey(appId)) {
- applications.put(appId,
- request.getApplicationHomeSubCluster().getHomeSubCluster());
+ applications.put(appId, homeSubCluster);
}
- return AddApplicationHomeSubClusterResponse
- .newInstance(applications.get(appId));
+ ApplicationHomeSubCluster respHomeSubCluster = applications.get(appId);
+ return AddApplicationHomeSubClusterResponse.newInstance(respHomeSubCluster.getHomeSubCluster());
}
@Override
@@ -256,15 +262,16 @@ public class MemoryFederationStateStore implements FederationStateStore {
UpdateApplicationHomeSubClusterRequest request) throws YarnException {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
+
ApplicationId appId =
request.getApplicationHomeSubCluster().getApplicationId();
+
if (!applications.containsKey(appId)) {
- String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Application %s does not exist.", appId);
}
- applications.put(appId,
- request.getApplicationHomeSubCluster().getHomeSubCluster());
+ applications.put(appId, request.getApplicationHomeSubCluster());
return UpdateApplicationHomeSubClusterResponse.newInstance();
}
@@ -275,11 +282,12 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Application %s does not exist.", appId);
}
- return GetApplicationHomeSubClusterResponse.newInstance(appId, applications.get(appId));
+ return GetApplicationHomeSubClusterResponse.newInstance(appId,
+ applications.get(appId).getHomeSubCluster());
}
@Override
@@ -303,7 +311,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
}
private ApplicationHomeSubCluster generateAppHomeSC(ApplicationId applicationId) {
- SubClusterId subClusterId = applications.get(applicationId);
+ SubClusterId subClusterId = applications.get(applicationId).getHomeSubCluster();
return ApplicationHomeSubCluster.newInstance(applicationId, subClusterId);
}
@@ -314,8 +322,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationApplicationHomeSubClusterStoreInputValidator.validate(request);
ApplicationId appId = request.getApplicationId();
if (!applications.containsKey(appId)) {
- String errMsg = "Application " + appId + " does not exist";
- FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Application %s does not exist.", appId);
}
applications.remove(appId);
@@ -329,12 +337,11 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationPolicyStoreInputValidator.validate(request);
String queue = request.getQueue();
if (!policies.containsKey(queue)) {
- LOG.warn("Policy for queue: {} does not exist.", queue);
+ LOG.warn("Policy for queue : {} does not exist.", queue);
return null;
}
- return GetSubClusterPolicyConfigurationResponse
- .newInstance(policies.get(queue));
+ return GetSubClusterPolicyConfigurationResponse.newInstance(policies.get(queue));
}
@Override
@@ -350,8 +357,7 @@ public class MemoryFederationStateStore implements FederationStateStore {
@Override
public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
- ArrayList<SubClusterPolicyConfiguration> result =
- new ArrayList<SubClusterPolicyConfiguration>();
+ ArrayList<SubClusterPolicyConfiguration> result = new ArrayList<>();
for (SubClusterPolicyConfiguration policy : policies.values()) {
result.add(policy);
}
@@ -386,7 +392,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
- throw new YarnException("Reservation " + reservationId + " does not exist");
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Reservation %s does not exist.", reservationId);
}
SubClusterId subClusterId = reservations.get(reservationId);
ReservationHomeSubCluster homeSubCluster =
@@ -417,7 +424,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId();
if (!reservations.containsKey(reservationId)) {
- throw new YarnException("Reservation " + reservationId + " does not exist.");
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Reservation %s does not exist.", reservationId);
}
SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster();
@@ -431,7 +439,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
- throw new YarnException("Reservation " + reservationId + " does not exist");
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Reservation %s does not exist.", reservationId);
}
reservations.remove(reservationId);
return DeleteReservationHomeSubClusterResponse.newInstance();
@@ -446,9 +455,8 @@ public class MemoryFederationStateStore implements FederationStateStore {
Set<DelegationKey> rmDTMasterKeyState = routerRMSecretManagerState.getMasterKeyState();
if (rmDTMasterKeyState.contains(delegationKey)) {
- LOG.info("Error storing info for RMDTMasterKey with keyID: {}.", delegationKey.getKeyId());
- throw new IOException("RMDTMasterKey with keyID: " + delegationKey.getKeyId() +
- " is already stored");
+ FederationStateStoreUtils.logAndThrowStoreException(LOG,
+ "Error storing info for RMDTMasterKey with keyID: %s.", delegationKey.getKeyId());
}
routerRMSecretManagerState.getMasterKeyState().add(delegationKey);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index c93115ccfd3..1952d47cb54 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -164,12 +164,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
- try {
- stateStore.deregisterSubCluster(deregisterRequest);
- Assert.fail();
- } catch (FederationStateStoreException e) {
- Assert.assertTrue(e.getMessage().startsWith("SubCluster SC not found"));
- }
+
+ LambdaTestUtils.intercept(YarnException.class,
+ "SubCluster SC not found", () -> stateStore.deregisterSubCluster(deregisterRequest));
}
@Test
@@ -266,13 +263,9 @@ public abstract class FederationStateStoreBaseTest {
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "capability");
- try {
- stateStore.subClusterHeartbeat(heartbeatRequest);
- Assert.fail();
- } catch (FederationStateStoreException e) {
- Assert.assertTrue(e.getMessage()
- .startsWith("SubCluster SC does not exist; cannot heartbeat"));
- }
+ LambdaTestUtils.intercept(YarnException.class,
+ "SubCluster SC does not exist; cannot heartbeat",
+ () -> stateStore.subClusterHeartbeat(heartbeatRequest));
}
// Test FederationApplicationHomeSubClusterStore
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org