You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/06/20 23:23:08 UTC
[14/50] [abbrv] hadoop git commit: YARN-5407. In-memory based
implementation of the
FederationApplicationStateStore/FederationPolicyStateStore. (Ellen Hui via
Subru)
YARN-5407. In-memory based implementation of the FederationApplicationStateStore/FederationPolicyStateStore. (Ellen Hui via Subru)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fb8e1767
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fb8e1767
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fb8e1767
Branch: refs/heads/YARN-2915
Commit: fb8e17677ef821846e49b48cab5470614430f236
Parents: 2716da2
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Aug 9 16:07:55 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Tue Jun 20 16:07:34 2017 -0700
----------------------------------------------------------------------
.../store/impl/MemoryFederationStateStore.java | 158 +++++++-
...SubClusterPoliciesConfigurationsRequest.java | 2 +-
...ubClusterPoliciesConfigurationsResponse.java | 2 +-
...GetSubClusterPolicyConfigurationRequest.java | 3 +-
...etSubClusterPolicyConfigurationResponse.java | 2 +-
...SetSubClusterPolicyConfigurationRequest.java | 20 +-
...etSubClusterPolicyConfigurationResponse.java | 2 +-
.../records/SubClusterPolicyConfiguration.java | 27 +-
...tApplicationHomeSubClusterRequestPBImpl.java | 4 +
...ClusterPolicyConfigurationRequestPBImpl.java | 17 -
.../pb/SubClusterPolicyConfigurationPBImpl.java | 17 +
.../proto/yarn_server_federation_protos.proto | 8 +-
.../impl/FederationStateStoreBaseTest.java | 367 ++++++++++++++++++-
.../impl/TestMemoryFederationStateStore.java | 4 +-
14 files changed, 558 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
index cea4ac2..a540dff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/MemoryFederationStateStore.java
@@ -20,35 +20,72 @@ package org.apache.hadoop.yarn.server.federation.store.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.util.MonotonicClock;
/**
- * In-memory implementation of FederationMembershipStateStore.
+ * In-memory implementation of {@link FederationStateStore}.
*/
-public class MemoryFederationStateStore
- implements FederationMembershipStateStore {
+public class MemoryFederationStateStore implements FederationStateStore {
+
+ private Map<SubClusterId, SubClusterInfo> membership;
+ private Map<ApplicationId, SubClusterId> applications;
+ private Map<String, SubClusterPolicyConfiguration> policies;
- private final Map<SubClusterId, SubClusterInfo> membership =
- new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
private final MonotonicClock clock = new MonotonicClock();
@Override
+ public void init(Configuration conf) {
+ membership = new ConcurrentHashMap<SubClusterId, SubClusterInfo>();
+ applications = new ConcurrentHashMap<ApplicationId, SubClusterId>();
+ policies = new ConcurrentHashMap<String, SubClusterPolicyConfiguration>();
+ }
+
+ @Override
+ public void close() {
+ membership = null;
+ applications = null;
+ policies = null;
+ }
+
+ @Override
public SubClusterRegisterResponse registerSubCluster(
SubClusterRegisterRequest request) throws YarnException {
SubClusterInfo subClusterInfo = request.getSubClusterInfo();
@@ -116,4 +153,113 @@ public class MemoryFederationStateStore
return GetSubClustersInfoResponse.newInstance(result);
}
+ // FederationApplicationHomeSubClusterStore methods
+
+ @Override
+ public AddApplicationHomeSubClusterResponse addApplicationHomeSubClusterMap(
+ AddApplicationHomeSubClusterRequest request) throws YarnException {
+ ApplicationId appId =
+ request.getApplicationHomeSubCluster().getApplicationId();
+ if (applications.containsKey(appId)) {
+ throw new YarnException("Application " + appId + " already exists");
+ }
+
+ applications.put(appId,
+ request.getApplicationHomeSubCluster().getHomeSubCluster());
+ return AddApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public UpdateApplicationHomeSubClusterResponse updateApplicationHomeSubClusterMap(
+ UpdateApplicationHomeSubClusterRequest request) throws YarnException {
+ ApplicationId appId =
+ request.getApplicationHomeSubCluster().getApplicationId();
+ if (!applications.containsKey(appId)) {
+ throw new YarnException("Application " + appId + " does not exist");
+ }
+
+ applications.put(appId,
+ request.getApplicationHomeSubCluster().getHomeSubCluster());
+ return UpdateApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetApplicationHomeSubClusterResponse getApplicationHomeSubClusterMap(
+ GetApplicationHomeSubClusterRequest request) throws YarnException {
+ ApplicationId appId = request.getApplicationId();
+ if (!applications.containsKey(appId)) {
+ throw new YarnException("Application " + appId + " does not exist");
+ }
+
+ return GetApplicationHomeSubClusterResponse.newInstance(
+ ApplicationHomeSubCluster.newInstance(appId, applications.get(appId)));
+ }
+
+ @Override
+ public GetApplicationsHomeSubClusterResponse getApplicationsHomeSubClusterMap(
+ GetApplicationsHomeSubClusterRequest request) throws YarnException {
+ List<ApplicationHomeSubCluster> result =
+ new ArrayList<ApplicationHomeSubCluster>();
+ for (Entry<ApplicationId, SubClusterId> e : applications.entrySet()) {
+ result
+ .add(ApplicationHomeSubCluster.newInstance(e.getKey(), e.getValue()));
+ }
+
+ GetApplicationsHomeSubClusterResponse.newInstance(result);
+ return GetApplicationsHomeSubClusterResponse.newInstance(result);
+ }
+
+ @Override
+ public DeleteApplicationHomeSubClusterResponse deleteApplicationHomeSubClusterMap(
+ DeleteApplicationHomeSubClusterRequest request) throws YarnException {
+ ApplicationId appId = request.getApplicationId();
+ if (!applications.containsKey(appId)) {
+ throw new YarnException("Application " + appId + " does not exist");
+ }
+
+ applications.remove(appId);
+ return DeleteApplicationHomeSubClusterResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPolicyConfigurationResponse getPolicyConfiguration(
+ GetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ String queue = request.getQueue();
+ if (!policies.containsKey(queue)) {
+ throw new YarnException("Policy for queue " + queue + " does not exist");
+ }
+
+ return GetSubClusterPolicyConfigurationResponse
+ .newInstance(policies.get(queue));
+ }
+
+ @Override
+ public SetSubClusterPolicyConfigurationResponse setPolicyConfiguration(
+ SetSubClusterPolicyConfigurationRequest request) throws YarnException {
+ policies.put(request.getPolicyConfiguration().getQueue(),
+ request.getPolicyConfiguration());
+ return SetSubClusterPolicyConfigurationResponse.newInstance();
+ }
+
+ @Override
+ public GetSubClusterPoliciesConfigurationsResponse getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest request) throws YarnException {
+ ArrayList<SubClusterPolicyConfiguration> result =
+ new ArrayList<SubClusterPolicyConfiguration>();
+ for (SubClusterPolicyConfiguration policy : policies.values()) {
+ result.add(policy);
+ }
+ return GetSubClusterPoliciesConfigurationsResponse.newInstance(result);
+ }
+
+ @Override
+ public Version getCurrentVersion() {
+ return null;
+ }
+
+ @Override
+ public Version loadVersion() {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
index 404521b..8cb84f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsRequest.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class GetSubClusterPoliciesConfigurationsRequest {
- public GetSubClusterPoliciesConfigurationsRequest newInstance() {
+ public static GetSubClusterPoliciesConfigurationsRequest newInstance() {
return Records.newRecord(GetSubClusterPoliciesConfigurationsRequest.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
index 6554d68..2eaeb51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPoliciesConfigurationsResponse.java
@@ -36,7 +36,7 @@ public abstract class GetSubClusterPoliciesConfigurationsResponse {
@Private
@Unstable
- public GetSubClusterPoliciesConfigurationsResponse newInstance(
+ public static GetSubClusterPoliciesConfigurationsResponse newInstance(
List<SubClusterPolicyConfiguration> policyConfigurations) {
GetSubClusterPoliciesConfigurationsResponse response =
Records.newRecord(GetSubClusterPoliciesConfigurationsResponse.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
index 7b7d8c4..c3f49e1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationRequest.java
@@ -33,7 +33,8 @@ public abstract class GetSubClusterPolicyConfigurationRequest {
@Private
@Unstable
- public GetSubClusterPolicyConfigurationRequest newInstance(String queueName) {
+ public static GetSubClusterPolicyConfigurationRequest newInstance(
+ String queueName) {
GetSubClusterPolicyConfigurationRequest request =
Records.newRecord(GetSubClusterPolicyConfigurationRequest.class);
request.setQueue(queueName);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
index 11a46e0..350b239 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/GetSubClusterPolicyConfigurationResponse.java
@@ -34,7 +34,7 @@ public abstract class GetSubClusterPolicyConfigurationResponse {
@Private
@Unstable
- public GetSubClusterPolicyConfigurationResponse newInstance(
+ public static GetSubClusterPolicyConfigurationResponse newInstance(
SubClusterPolicyConfiguration policy) {
GetSubClusterPolicyConfigurationResponse response =
Records.newRecord(GetSubClusterPolicyConfigurationResponse.class);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
index 06d5399..743ad0e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationRequest.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.util.Records;
public abstract class SetSubClusterPolicyConfigurationRequest {
@Private
@Unstable
- public SetSubClusterPolicyConfigurationRequest newInstance(
+ public static SetSubClusterPolicyConfigurationRequest newInstance(
SubClusterPolicyConfiguration policy) {
SetSubClusterPolicyConfigurationRequest request =
Records.newRecord(SetSubClusterPolicyConfigurationRequest.class);
@@ -41,24 +41,6 @@ public abstract class SetSubClusterPolicyConfigurationRequest {
}
/**
- * Get the name of the queue for which we are configuring a policy.
- *
- * @return the name of the queue
- */
- @Public
- @Unstable
- public abstract String getQueue();
-
- /**
- * Sets the name of the queue for which we are configuring a policy.
- *
- * @param queueName the name of the queue
- */
- @Private
- @Unstable
- public abstract void setQueue(String queueName);
-
- /**
* Get the policy configuration assigned to the queue.
*
* @return the policy for the specified queue
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
index 33c4043..401e984 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SetSubClusterPolicyConfigurationResponse.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.yarn.util.Records;
@Private
@Unstable
public abstract class SetSubClusterPolicyConfigurationResponse {
- public SetSubClusterPolicyConfigurationResponse newInstance() {
+ public static SetSubClusterPolicyConfigurationResponse newInstance() {
return Records.newRecord(SetSubClusterPolicyConfigurationResponse.class);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
index bc12acb..2839139 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
@@ -29,8 +29,8 @@ import java.nio.ByteBuffer;
/**
* {@link SubClusterPolicyConfiguration} is a class that represents a
- * configuration of a policy. It contains a policy type (resolve to a class
- * name) and its params as an opaque {@link ByteBuffer}.
+ * configuration of a policy. For a single queue, it contains a policy type
+ * (resolve to a class name) and its params as an opaque {@link ByteBuffer}.
*
* Note: by design the params are an opaque ByteBuffer, this allows for enough
* flexibility to evolve the policies without impacting the protocols to/from
@@ -42,16 +42,35 @@ public abstract class SubClusterPolicyConfiguration {
@Private
@Unstable
- public static SubClusterPolicyConfiguration newInstance(String policyType,
- ByteBuffer policyParams) {
+ public static SubClusterPolicyConfiguration newInstance(String queue,
+ String policyType, ByteBuffer policyParams) {
SubClusterPolicyConfiguration policy =
Records.newRecord(SubClusterPolicyConfiguration.class);
+ policy.setQueue(queue);
policy.setType(policyType);
policy.setParams(policyParams);
return policy;
}
/**
+ * Get the name of the queue for which we are configuring a policy.
+ *
+ * @return the name of the queue
+ */
+ @Public
+ @Unstable
+ public abstract String getQueue();
+
+ /**
+ * Sets the name of the queue for which we are configuring a policy.
+ *
+ * @param queueName the name of the queue
+ */
+ @Private
+ @Unstable
+ public abstract void setQueue(String queueName);
+
+ /**
* Get the type of the policy. This could be random, round-robin, load-based,
* etc.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
index 865d0c4..585ba81 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/GetApplicationHomeSubClusterRequestPBImpl.java
@@ -108,6 +108,10 @@ public class GetApplicationHomeSubClusterRequestPBImpl
public ApplicationId getApplicationId() {
GetApplicationHomeSubClusterRequestProtoOrBuilder p =
viaProto ? proto : builder;
+ if (applicationId != null) {
+ return applicationId;
+ }
+
if (!p.hasApplicationId()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
index 5e29bd5..7b7f89d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SetSubClusterPolicyConfigurationRequestPBImpl.java
@@ -107,23 +107,6 @@ public class SetSubClusterPolicyConfigurationRequestPBImpl
}
@Override
- public String getQueue() {
- SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
- viaProto ? proto : builder;
- return p.getQueue();
- }
-
- @Override
- public void setQueue(String queueName) {
- maybeInitBuilder();
- if (queueName == null) {
- builder.clearQueue();
- return;
- }
- builder.setQueue(queueName);
- }
-
- @Override
public SubClusterPolicyConfiguration getPolicyConfiguration() {
SetSubClusterPolicyConfigurationRequestProtoOrBuilder p =
viaProto ? proto : builder;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
index fe9d9db..305a8d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/impl/pb/SubClusterPolicyConfigurationPBImpl.java
@@ -87,6 +87,23 @@ public class SubClusterPolicyConfigurationPBImpl
}
@Override
+ public String getQueue() {
+ SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
+ return p.getQueue();
+ }
+
+ @Override
+ public void setQueue(String queueName) {
+ maybeInitBuilder();
+ if (queueName == null) {
+ builder.clearType();
+ return;
+ }
+ builder.setQueue(queueName);
+
+ }
+
+ @Override
public String getType() {
SubClusterPolicyConfigurationProtoOrBuilder p = viaProto ? proto : builder;
return p.getType();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
index 3f1cee9..11f786f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_federation_protos.proto
@@ -136,8 +136,9 @@ message DeleteApplicationHomeSubClusterResponseProto {
}
message SubClusterPolicyConfigurationProto {
- optional string type = 1;
- optional bytes params = 2;
+ optional string queue = 1;
+ optional string type = 2;
+ optional bytes params = 3;
}
message GetSubClusterPolicyConfigurationRequestProto {
@@ -149,8 +150,7 @@ message GetSubClusterPolicyConfigurationResponseProto {
}
message SetSubClusterPolicyConfigurationRequestProto {
- optional string queue = 1;
- optional SubClusterPolicyConfigurationProto policy_configuration = 2;
+ optional SubClusterPolicyConfigurationProto policy_configuration = 1;
}
message SetSubClusterPolicyConfigurationResponseProto {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
index c76a485..165dd78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/FederationStateStoreBaseTest.java
@@ -18,18 +18,39 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.AddApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.ApplicationHomeSubCluster;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPoliciesConfigurationsResponse;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.GetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetSubClustersInfoRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.SetSubClusterPolicyConfigurationResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterHeartbeatRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
+import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.junit.After;
import org.junit.Assert;
@@ -42,20 +63,21 @@ import org.junit.Test;
public abstract class FederationStateStoreBaseTest {
private static final MonotonicClock CLOCK = new MonotonicClock();
+ private FederationStateStore stateStore = createStateStore();
- private FederationMembershipStateStore stateStore;
+ protected abstract FederationStateStore createStateStore();
@Before
- public void before() throws IOException {
- stateStore = getCleanStateStore();
+ public void before() throws IOException, YarnException {
+ stateStore.init(new Configuration());
}
@After
- public void after() {
- stateStore = null;
+ public void after() throws Exception {
+ stateStore.close();
}
- protected abstract FederationMembershipStateStore getCleanStateStore();
+ // Test FederationMembershipStateStore
@Test
public void testRegisterSubCluster() throws Exception {
@@ -72,10 +94,7 @@ public abstract class FederationStateStoreBaseTest {
@Test
public void testDeregisterSubCluster() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
- SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
- stateStore.registerSubCluster(
- SubClusterRegisterRequest.newInstance(subClusterInfo));
+ registerSubCluster(subClusterId);
SubClusterDeregisterRequest deregisterRequest = SubClusterDeregisterRequest
.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED);
@@ -105,9 +124,7 @@ public abstract class FederationStateStoreBaseTest {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
- stateStore.registerSubCluster(
- SubClusterRegisterRequest.newInstance(subClusterInfo));
+ registerSubCluster(subClusterId);
GetSubClusterInfoRequest request =
GetSubClusterInfoRequest.newInstance(subClusterId);
@@ -167,10 +184,7 @@ public abstract class FederationStateStoreBaseTest {
@Test
public void testSubClusterHeartbeat() throws Exception {
SubClusterId subClusterId = SubClusterId.newInstance("SC");
- SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
-
- stateStore.registerSubCluster(
- SubClusterRegisterRequest.newInstance(subClusterInfo));
+ registerSubCluster(subClusterId);
SubClusterHeartbeatRequest heartbeatRequest = SubClusterHeartbeatRequest
.newInstance(subClusterId, SubClusterState.SC_RUNNING, "cabability");
@@ -196,6 +210,271 @@ public abstract class FederationStateStoreBaseTest {
}
}
+ // Test FederationApplicationHomeSubClusterStore
+
+ @Test
+ public void testAddApplicationHomeSubClusterMap() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC");
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+
+ AddApplicationHomeSubClusterRequest request =
+ AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+ AddApplicationHomeSubClusterResponse response =
+ stateStore.addApplicationHomeSubClusterMap(request);
+
+ Assert.assertNotNull(response);
+ Assert.assertEquals(subClusterId, queryApplicationHomeSC(appId));
+
+ }
+
+ @Test
+ public void testAddApplicationHomeSubClusterMapAppAlreadyExists()
+ throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+ addApplicationHomeSC(appId, subClusterId1);
+
+ SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+ ApplicationHomeSubCluster ahsc2 =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
+
+ try {
+ stateStore.addApplicationHomeSubClusterMap(
+ AddApplicationHomeSubClusterRequest.newInstance(ahsc2));
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " already exists"));
+ }
+
+ Assert.assertEquals(subClusterId1, queryApplicationHomeSC(appId));
+
+ }
+
+ @Test
+ public void testDeleteApplicationHomeSubClusterMap() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC");
+ addApplicationHomeSC(appId, subClusterId);
+
+ DeleteApplicationHomeSubClusterRequest delRequest =
+ DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+ DeleteApplicationHomeSubClusterResponse response =
+ stateStore.deleteApplicationHomeSubClusterMap(delRequest);
+
+ Assert.assertNotNull(response);
+ try {
+ queryApplicationHomeSC(appId);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId + " does not exist"));
+ }
+
+ }
+
+ @Test
+ public void testDeleteApplicationHomeSubClusterMapUnknownApp()
+ throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ DeleteApplicationHomeSubClusterRequest delRequest =
+ DeleteApplicationHomeSubClusterRequest.newInstance(appId);
+
+ try {
+ stateStore.deleteApplicationHomeSubClusterMap(delRequest);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
+ }
+ }
+
+ @Test
+ public void testGetApplicationHomeSubClusterMap() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId = SubClusterId.newInstance("SC");
+ addApplicationHomeSC(appId, subClusterId);
+
+ GetApplicationHomeSubClusterRequest getRequest =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ GetApplicationHomeSubClusterResponse result =
+ stateStore.getApplicationHomeSubClusterMap(getRequest);
+
+ Assert.assertEquals(appId,
+ result.getApplicationHomeSubCluster().getApplicationId());
+ Assert.assertEquals(subClusterId,
+ result.getApplicationHomeSubCluster().getHomeSubCluster());
+ }
+
+ @Test
+ public void testGetApplicationHomeSubClusterMapUnknownApp() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ try {
+ stateStore.getApplicationHomeSubClusterMap(request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
+ }
+ }
+
+ @Test
+ public void testGetApplicationsHomeSubClusterMap() throws Exception {
+ ApplicationId appId1 = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+ ApplicationHomeSubCluster ahsc1 =
+ ApplicationHomeSubCluster.newInstance(appId1, subClusterId1);
+
+ ApplicationId appId2 = ApplicationId.newInstance(1, 2);
+ SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+ ApplicationHomeSubCluster ahsc2 =
+ ApplicationHomeSubCluster.newInstance(appId2, subClusterId2);
+
+ addApplicationHomeSC(appId1, subClusterId1);
+ addApplicationHomeSC(appId2, subClusterId2);
+
+ GetApplicationsHomeSubClusterRequest getRequest =
+ GetApplicationsHomeSubClusterRequest.newInstance();
+
+ GetApplicationsHomeSubClusterResponse result =
+ stateStore.getApplicationsHomeSubClusterMap(getRequest);
+
+ Assert.assertEquals(2, result.getAppsHomeSubClusters().size());
+ Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc1));
+ Assert.assertTrue(result.getAppsHomeSubClusters().contains(ahsc2));
+ }
+
+ @Test
+ public void testUpdateApplicationHomeSubClusterMap() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+ addApplicationHomeSC(appId, subClusterId1);
+
+ SubClusterId subClusterId2 = SubClusterId.newInstance("SC2");
+ ApplicationHomeSubCluster ahscUpdate =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId2);
+
+ UpdateApplicationHomeSubClusterRequest updateRequest =
+ UpdateApplicationHomeSubClusterRequest.newInstance(ahscUpdate);
+
+ UpdateApplicationHomeSubClusterResponse response =
+ stateStore.updateApplicationHomeSubClusterMap(updateRequest);
+
+ Assert.assertNotNull(response);
+
+ Assert.assertEquals(subClusterId2, queryApplicationHomeSC(appId));
+ }
+
+ @Test
+ public void testUpdateApplicationHomeSubClusterMapUnknownApp()
+ throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ SubClusterId subClusterId1 = SubClusterId.newInstance("SC1");
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId1);
+
+ UpdateApplicationHomeSubClusterRequest updateRequest =
+ UpdateApplicationHomeSubClusterRequest.newInstance(ahsc);
+
+ try {
+ stateStore.updateApplicationHomeSubClusterMap((updateRequest));
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(e.getMessage()
+ .startsWith("Application " + appId.toString() + " does not exist"));
+ }
+ }
+
+ // Test FederationPolicyStore
+
+ @Test
+ public void testSetPolicyConfiguration() throws Exception {
+ SetSubClusterPolicyConfigurationRequest request =
+ SetSubClusterPolicyConfigurationRequest
+ .newInstance(createSCPolicyConf("Queue", "PolicyType"));
+
+ SetSubClusterPolicyConfigurationResponse result =
+ stateStore.setPolicyConfiguration(request);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
+ queryPolicy("Queue"));
+
+ }
+
+ @Test
+ public void testSetPolicyConfigurationUpdateExisting() throws Exception {
+ setPolicyConf("Queue", "PolicyType1");
+
+ SetSubClusterPolicyConfigurationRequest request2 =
+ SetSubClusterPolicyConfigurationRequest
+ .newInstance(createSCPolicyConf("Queue", "PolicyType2"));
+ SetSubClusterPolicyConfigurationResponse result =
+ stateStore.setPolicyConfiguration(request2);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType2"),
+ queryPolicy("Queue"));
+ }
+
+ @Test
+ public void testGetPolicyConfiguration() throws Exception {
+ setPolicyConf("Queue", "PolicyType");
+
+ GetSubClusterPolicyConfigurationRequest getRequest =
+ GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
+ GetSubClusterPolicyConfigurationResponse result =
+ stateStore.getPolicyConfiguration(getRequest);
+
+ Assert.assertNotNull(result);
+ Assert.assertEquals(createSCPolicyConf("Queue", "PolicyType"),
+ result.getPolicyConfiguration());
+
+ }
+
+ @Test
+ public void testGetPolicyConfigurationUnknownQueue() throws Exception {
+
+ GetSubClusterPolicyConfigurationRequest request =
+ GetSubClusterPolicyConfigurationRequest.newInstance("Queue");
+ try {
+ stateStore.getPolicyConfiguration(request);
+ Assert.fail();
+ } catch (YarnException e) {
+ Assert.assertTrue(
+ e.getMessage().startsWith("Policy for queue Queue does not exist"));
+ }
+ }
+
+ @Test
+ public void testGetPoliciesConfigurations() throws Exception {
+ setPolicyConf("Queue1", "PolicyType1");
+ setPolicyConf("Queue2", "PolicyType2");
+
+ GetSubClusterPoliciesConfigurationsResponse response =
+ stateStore.getPoliciesConfigurations(
+ GetSubClusterPoliciesConfigurationsRequest.newInstance());
+
+ Assert.assertNotNull(response);
+ Assert.assertNotNull(response.getPoliciesConfigs());
+
+ Assert.assertEquals(2, response.getPoliciesConfigs().size());
+
+ Assert.assertTrue(response.getPoliciesConfigs()
+ .contains(createSCPolicyConf("Queue1", "PolicyType1")));
+ Assert.assertTrue(response.getPoliciesConfigs()
+ .contains(createSCPolicyConf("Queue2", "PolicyType2")));
+ }
+
+ // Convenience methods
+
private SubClusterInfo createSubClusterInfo(SubClusterId subClusterId) {
String amRMAddress = "1.2.3.4:1";
@@ -208,6 +487,37 @@ public abstract class FederationStateStoreBaseTest {
CLOCK.getTime(), "cabability");
}
+ private SubClusterPolicyConfiguration createSCPolicyConf(String queueName,
+ String policyType) {
+ return SubClusterPolicyConfiguration.newInstance(queueName, policyType,
+ ByteBuffer.allocate(1));
+ }
+
+ private void addApplicationHomeSC(ApplicationId appId,
+ SubClusterId subClusterId) throws YarnException {
+ ApplicationHomeSubCluster ahsc =
+ ApplicationHomeSubCluster.newInstance(appId, subClusterId);
+ AddApplicationHomeSubClusterRequest request =
+ AddApplicationHomeSubClusterRequest.newInstance(ahsc);
+ stateStore.addApplicationHomeSubClusterMap(request);
+ }
+
+ private void setPolicyConf(String queue, String policyType)
+ throws YarnException {
+ SetSubClusterPolicyConfigurationRequest request =
+ SetSubClusterPolicyConfigurationRequest
+ .newInstance(createSCPolicyConf(queue, policyType));
+ stateStore.setPolicyConfiguration(request);
+ }
+
+ private void registerSubCluster(SubClusterId subClusterId)
+ throws YarnException {
+
+ SubClusterInfo subClusterInfo = createSubClusterInfo(subClusterId);
+ stateStore.registerSubCluster(
+ SubClusterRegisterRequest.newInstance(subClusterInfo));
+ }
+
private SubClusterInfo querySubClusterInfo(SubClusterId subClusterId)
throws YarnException {
GetSubClusterInfoRequest request =
@@ -215,4 +525,25 @@ public abstract class FederationStateStoreBaseTest {
return stateStore.getSubCluster(request).getSubClusterInfo();
}
+ private SubClusterId queryApplicationHomeSC(ApplicationId appId)
+ throws YarnException {
+ GetApplicationHomeSubClusterRequest request =
+ GetApplicationHomeSubClusterRequest.newInstance(appId);
+
+ GetApplicationHomeSubClusterResponse response =
+ stateStore.getApplicationHomeSubClusterMap(request);
+
+ return response.getApplicationHomeSubCluster().getHomeSubCluster();
+ }
+
+ private SubClusterPolicyConfiguration queryPolicy(String queue)
+ throws YarnException {
+ GetSubClusterPolicyConfigurationRequest request =
+ GetSubClusterPolicyConfigurationRequest.newInstance(queue);
+
+ GetSubClusterPolicyConfigurationResponse result =
+ stateStore.getPolicyConfiguration(request);
+ return result.getPolicyConfiguration();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fb8e1767/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
index 9396eda..74404c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/store/impl/TestMemoryFederationStateStore.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.yarn.server.federation.store.impl;
-import org.apache.hadoop.yarn.server.federation.store.FederationMembershipStateStore;
+import org.apache.hadoop.yarn.server.federation.store.FederationStateStore;
/**
* Unit tests for MemoryFederationStateStore.
@@ -26,7 +26,7 @@ public class TestMemoryFederationStateStore
extends FederationStateStoreBaseTest {
@Override
- protected FederationMembershipStateStore getCleanStateStore() {
+ protected FederationStateStore createStateStore() {
return new MemoryFederationStateStore();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org