You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2017/05/16 16:10:48 UTC
[31/50] [abbrv] hadoop git commit: YARN-5324. Stateless Federation
router policies implementation. (Carlo Curino via Subru).
YARN-5324. Stateless Federation router policies implementation. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/94a87a4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/94a87a4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/94a87a4d
Branch: refs/heads/YARN-2915
Commit: 94a87a4d353cc736d0273fc43c5a4fe6c92a80fb
Parents: 04fd03d
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Sep 22 17:06:57 2016 -0700
Committer: Carlo Curino <cu...@apache.org>
Committed: Tue May 16 08:52:37 2017 -0700
----------------------------------------------------------------------
.../policies/FederationPolicyConfigurator.java | 91 -------
.../FederationPolicyInitializationContext.java | 11 +-
.../policies/FederationPolicyManager.java | 126 +++++++++
.../policies/FederationPolicyWriter.java | 45 ----
.../policies/dao/WeightedPolicyInfo.java | 253 +++++++++++++++++++
.../federation/policies/dao/package-info.java | 20 ++
.../router/BaseWeightedRouterPolicy.java | 150 +++++++++++
.../policies/router/LoadBasedRouterPolicy.java | 109 ++++++++
.../policies/router/PriorityRouterPolicy.java | 66 +++++
.../router/UniformRandomRouterPolicy.java | 85 +++++++
.../router/WeightedRandomRouterPolicy.java | 79 ++++++
.../store/records/SubClusterIdInfo.java | 75 ++++++
.../policies/BaseFederationPoliciesTest.java | 155 ++++++++++++
...ionPolicyInitializationContextValidator.java | 17 +-
.../router/TestLoadBasedRouterPolicy.java | 109 ++++++++
.../router/TestPriorityRouterPolicy.java | 87 +++++++
.../router/TestUniformRandomRouterPolicy.java | 65 +++++
.../router/TestWeightedRandomRouterPolicy.java | 127 ++++++++++
.../utils/FederationPoliciesTestUtil.java | 82 +++++-
19 files changed, 1604 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java
deleted file mode 100644
index fdc3857..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyConfigurator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-
-
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-
-import org.apache.hadoop.yarn.server.federation.policies.router
- .FederationRouterPolicy;
-
-/**
- * Implementors of this interface are capable to instantiate and (re)initalize
- * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} based on
- * a {@link FederationPolicyInitializationContext}. The reason to bind these two
- * policies together is to make sure we remain consistent across the router and
- * amrmproxy policy decisions.
- */
-public interface FederationPolicyConfigurator {
-
- /**
- * If the current instance is compatible, this method returns the same
- * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
- * current context, otherwise a new instance initialized with the current
- * context is provided. If the instance is compatible with the current class
- * the implementors should attempt to reinitalize (retaining state). To affect
- * a complete policy reset oldInstance should be null.
- *
- * @param federationPolicyInitializationContext the current context
- * @param oldInstance the existing (possibly null)
- * instance.
- *
- * @return an updated {@link FederationAMRMProxyPolicy
- }.
- *
- * @throws FederationPolicyInitializationException if the initialization
- * cannot be completed
- * properly. The oldInstance
- * should be still valid in
- * case of failed
- * initialization.
- */
- FederationAMRMProxyPolicy getAMRMPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
- FederationAMRMProxyPolicy oldInstance)
- throws FederationPolicyInitializationException;
-
- /**
- * If the current instance is compatible, this method returns the same
- * instance of {@link FederationRouterPolicy} reinitialized with the current
- * context, otherwise a new instance initialized with the current context is
- * provided. If the instance is compatible with the current class the
- * implementors should attempt to reinitalize (retaining state). To affect a
- * complete policy reset oldInstance shoulb be set to null.
- *
- * @param federationPolicyInitializationContext the current context
- * @param oldInstance the existing (possibly null)
- * instance.
- *
- * @return an updated {@link FederationRouterPolicy}.
- *
- * @throws FederationPolicyInitializationException if the initalization cannot
- * be completed properly. The
- * oldInstance should be still
- * valid in case of failed
- * initialization.
- */
- FederationRouterPolicy getRouterPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
- FederationRouterPolicy oldInstance)
- throws FederationPolicyInitializationException;
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
index 879ccee..9347fd0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
@@ -59,13 +59,12 @@ public class FederationPolicyInitializationContext {
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
- * @param federationPolicyConfiguration the
- * {@link SubClusterPolicyConfiguration}
- * to be used for initialization.
+ * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
+ * to be used for initialization.
*/
- public void setFederationPolicyConfiguration(
- SubClusterPolicyConfiguration federationPolicyConfiguration) {
- this.federationPolicyConfiguration = federationPolicyConfiguration;
+ public void setSubClusterPolicyConfiguration(
+ SubClusterPolicyConfiguration fedPolicyConfiguration) {
+ this.federationPolicyConfiguration = fedPolicyConfiguration;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
new file mode 100644
index 0000000..e5dba63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+/**
+ *
+ * Implementors need to provide the ability to serliaze a policy and its
+ * configuration as a {@link SubClusterPolicyConfiguration}, as well as
+ * provide (re)initialization mechanics for the underlying
+ * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
+ *
+ * The serialization aspects are used by admin APIs or a policy engine to
+ * store a serialized configuration in the {@code FederationStateStore},
+ * while the getters methods are used to obtain a propertly inizialized
+ * policy in the {@code Router} and {@code AMRMProxy} respectively.
+ *
+ * This interface by design binds together
+ * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and
+ * provide lifecycle support for serialization and deserialization, to reduce
+ * configuration mistakes (combining incompatible policies).
+ *
+ */
+public interface FederationPolicyManager {
+
+ /**
+ * If the current instance is compatible, this method returns the same
+ * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
+ * current context, otherwise a new instance initialized with the current
+ * context is provided. If the instance is compatible with the current class
+ * the implementors should attempt to reinitalize (retaining state). To affect
+ * a complete policy reset oldInstance should be null.
+ *
+ * @param federationPolicyInitializationContext the current context
+ * @param oldInstance the existing (possibly null)
+ * instance.
+ *
+ * @return an updated {@link FederationAMRMProxyPolicy
+ }.
+ *
+ * @throws FederationPolicyInitializationException if the initialization
+ * cannot be completed
+ * properly. The oldInstance
+ * should be still valid in
+ * case of failed
+ * initialization.
+ */
+ FederationAMRMProxyPolicy getAMRMPolicy(
+ FederationPolicyInitializationContext
+ federationPolicyInitializationContext,
+ FederationAMRMProxyPolicy oldInstance)
+ throws FederationPolicyInitializationException;
+
+ /**
+ * If the current instance is compatible, this method returns the same
+ * instance of {@link FederationRouterPolicy} reinitialized with the current
+ * context, otherwise a new instance initialized with the current context is
+ * provided. If the instance is compatible with the current class the
+ * implementors should attempt to reinitalize (retaining state). To affect a
+ * complete policy reset oldInstance shoulb be set to null.
+ *
+ * @param federationPolicyInitializationContext the current context
+ * @param oldInstance the existing (possibly null)
+ * instance.
+ *
+ * @return an updated {@link FederationRouterPolicy}.
+ *
+ * @throws FederationPolicyInitializationException if the initalization cannot
+ * be completed properly. The
+ * oldInstance should be still
+ * valid in case of failed
+ * initialization.
+ */
+ FederationRouterPolicy getRouterPolicy(
+ FederationPolicyInitializationContext
+ federationPolicyInitializationContext,
+ FederationRouterPolicy oldInstance)
+ throws FederationPolicyInitializationException;
+
+ /**
+ * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
+ * This is to be used when writing a policy object in the federation policy
+ * store.
+ *
+ * @return a valid policy configuration representing this object
+ * parametrization.
+ *
+ * @throws FederationPolicyInitializationException if the current state cannot
+ * be serialized properly
+ */
+ SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException;
+
+
+ /**
+ * This method returns the queue this policy is configured for.
+ * @return the name of the queue.
+ */
+ String getQueue();
+
+ /**
+ * This methods provides a setter for the queue this policy is specified for.
+ * @param queue the name of the queue.
+ */
+ void setQueue(String queue);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java
deleted file mode 100644
index 5034b7e..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyWriter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-/**
- * Implementors of this class are able to serializeConf the configuraiton of a
- * policy as a {@link SubClusterPolicyConfiguration}. This is used during the
- * lifetime of a policy from the admin APIs or policy engine to serializeConf
- * the policy into the policy store.
- */
-public interface FederationPolicyWriter {
-
- /**
- /**
- * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
- * This is to be used when writing a policy object in the federation policy
- * store.
- *
- * @return a valid policy configuration representing this object
- * parametrization.
- *
- * @throws FederationPolicyInitializationException if the current state cannot
- * be serialized properly
- */
- SubClusterPolicyConfiguration serializeConf()
- throws FederationPolicyInitializationException;
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
new file mode 100644
index 0000000..a0fa37f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
@@ -0,0 +1,253 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.dao;
+
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+import com.sun.jersey.api.json.JSONUnmarshaller;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This is a DAO class for the configuration of parameteres for federation
+ * policies. This generalizes several possible configurations as two lists of
+ * {@link SubClusterIdInfo} and corresponding weights as a
+ * {@link Float}. The interpretation of the weight is left to the logic in
+ * the policy.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@XmlRootElement(name = "federation-policy")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class WeightedPolicyInfo {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(WeightedPolicyInfo.class);
+
+ private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>();
+ private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>();
+ private float headroomAlpha;
+
+ private static JSONJAXBContext jsonjaxbContext = initContext();
+
+ private static JSONJAXBContext initContext() {
+ try {
+ return new JSONJAXBContext(JSONConfiguration.DEFAULT,
+ WeightedPolicyInfo.class);
+ } catch (JAXBException e) {
+ LOG.error("Error parsing the policy.", e);
+ }
+ return null;
+ }
+
+ public WeightedPolicyInfo() {
+ //JAXB needs this
+ }
+
+ /**
+ * Setter method for Router weights.
+ *
+ * @param policyWeights the router weights.
+ */
+ public void setRouterPolicyWeights(
+ Map<SubClusterIdInfo, Float> policyWeights) {
+ this.routerPolicyWeights = policyWeights;
+ }
+
+ /**
+ * Setter method for ARMRMProxy weights.
+ *
+ * @param policyWeights the amrmproxy weights.
+ */
+ public void setAMRMPolicyWeights(
+ Map<SubClusterIdInfo, Float> policyWeights) {
+ this.amrmPolicyWeights = policyWeights;
+ }
+
+ /**
+ * Getter of the router weights.
+ * @return the router weights.
+ */
+ public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
+ return routerPolicyWeights;
+ }
+
+ /**
+ * Getter for AMRMProxy weights.
+ * @return the AMRMProxy weights.
+ */
+ public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
+ return amrmPolicyWeights;
+ }
+
+ /**
+ * Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
+ * representation.
+ *
+ * @param bb the input byte representation.
+ *
+ * @return the {@link WeightedPolicyInfo} represented.
+ *
+ * @throws FederationPolicyInitializationException if a deserializaiton error
+ * occurs.
+ */
+ public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
+ throws FederationPolicyInitializationException {
+
+ if (jsonjaxbContext == null) {
+ throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ + " not be null.");
+ }
+
+ try {
+ JSONUnmarshaller unmarshaller = jsonjaxbContext.createJSONUnmarshaller();
+ final byte[] bytes = new byte[bb.remaining()];
+ bb.get(bytes);
+ String params = new String(bytes, Charset.forName("UTF-8"));
+
+ WeightedPolicyInfo weightedPolicyInfo = unmarshaller
+ .unmarshalFromJSON(new StringReader(params),
+ WeightedPolicyInfo.class);
+ return weightedPolicyInfo;
+ } catch (JAXBException j) {
+ throw new FederationPolicyInitializationException(j);
+ }
+ }
+
+ /**
+ * Converts the policy into a byte array representation in the input {@link
+ * ByteBuffer}.
+ *
+ * @return byte array representation of this policy configuration.
+ *
+ * @throws FederationPolicyInitializationException if a serialization error
+ * occurs.
+ */
+ public ByteBuffer toByteBuffer()
+ throws FederationPolicyInitializationException {
+ if (jsonjaxbContext == null) {
+ throw new FederationPolicyInitializationException("JSONJAXBContext should"
+ + " not be null.");
+ }
+ try {
+ String s = toJSONString();
+ return ByteBuffer.wrap(s.getBytes(Charset.forName("UTF-8")));
+ } catch (JAXBException j) {
+ throw new FederationPolicyInitializationException(j);
+ }
+ }
+
+ private String toJSONString() throws JAXBException {
+ JSONMarshaller marshaller = jsonjaxbContext.createJSONMarshaller();
+ marshaller.setProperty(Marshaller.JAXB_FORMATTED_OUTPUT, true);
+ StringWriter sw = new StringWriter(256);
+ marshaller.marshallToJSON(this, sw);
+ return sw.toString();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+
+ if (other == null || !other.getClass().equals(this.getClass())) {
+ return false;
+ }
+
+ WeightedPolicyInfo otherPolicy =
+ (WeightedPolicyInfo) other;
+ Map<SubClusterIdInfo, Float> otherAMRMWeights =
+ otherPolicy.getAMRMPolicyWeights();
+ Map<SubClusterIdInfo, Float> otherRouterWeights =
+ otherPolicy.getRouterPolicyWeights();
+
+ boolean amrmWeightsMatch = otherAMRMWeights != null &&
+ getAMRMPolicyWeights() != null &&
+ CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
+ getAMRMPolicyWeights().entrySet());
+
+ boolean routerWeightsMatch = otherRouterWeights != null &&
+ getRouterPolicyWeights() != null &&
+ CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
+ getRouterPolicyWeights().entrySet());
+
+ return amrmWeightsMatch && routerWeightsMatch;
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * amrmPolicyWeights.hashCode() + routerPolicyWeights.hashCode();
+ }
+
+ /**
+ * Return the parameter headroomAlpha, used by policies that balance
+ * weight-based and load-based considerations in their decisions.
+ *
+ * For policies that use this parameter, values close to 1 indicate that
+ * most of the decision should be based on currently observed headroom from
+ * various sub-clusters, values close to zero, indicate that the decision
+ * should be mostly based on weights and practically ignore current load.
+ *
+ * @return the value of headroomAlpha.
+ */
+ public float getHeadroomAlpha() {
+ return headroomAlpha;
+ }
+
+ /**
+ * Set the parameter headroomAlpha, used by policies that balance
+ * weight-based and load-based considerations in their decisions.
+ *
+ * For policies that use this parameter, values close to 1 indicate that
+ * most of the decision should be based on currently observed headroom from
+ * various sub-clusters, values close to zero, indicate that the decision
+ * should be mostly based on weights and practically ignore current load.
+ *
+ * @param headroomAlpha the value to use for balancing.
+ */
+ public void setHeadroomAlpha(float headroomAlpha) {
+ this.headroomAlpha = headroomAlpha;
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return toJSONString();
+ } catch (JAXBException e) {
+ e.printStackTrace();
+ return "Error serializing to string.";
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
new file mode 100644
index 0000000..43f5b83
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** DAO objects for serializing/deserializing policy configurations. **/
+package org.apache.hadoop.yarn.server.federation.policies.dao;
+
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
new file mode 100644
index 0000000..e888979
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import java.util.Map;
+
+/**
+ * Abstract class provides common validation of reinitialize(), for all
+ * policies that are "weight-based".
+ */
+public abstract class BaseWeightedRouterPolicy
+ implements FederationRouterPolicy {
+
+ private WeightedPolicyInfo policyInfo = null;
+ private FederationPolicyInitializationContext policyContext;
+
+ public BaseWeightedRouterPolicy() {
+ }
+
+ @Override
+ public void reinitialize(FederationPolicyInitializationContext
+ federationPolicyContext)
+ throws FederationPolicyInitializationException {
+ FederationPolicyInitializationContextValidator
+ .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+ // perform consistency checks
+ WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
+ .fromByteBuffer(
+ federationPolicyContext.getSubClusterPolicyConfiguration()
+ .getParams());
+
+ // if nothing has changed skip the rest of initialization
+ if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
+ return;
+ }
+
+ validate(newPolicyInfo);
+ setPolicyInfo(newPolicyInfo);
+ this.policyContext = federationPolicyContext;
+ }
+
+ /**
+ * Overridable validation step for the policy configuration.
+ * @param newPolicyInfo the configuration to test.
+ * @throws FederationPolicyInitializationException if the configuration is
+ * not valid.
+ */
+ public void validate(WeightedPolicyInfo newPolicyInfo) throws
+ FederationPolicyInitializationException {
+ if (newPolicyInfo == null) {
+ throw new FederationPolicyInitializationException("The policy to "
+ + "validate should not be null.");
+ }
+ Map<SubClusterIdInfo, Float> newWeights =
+ newPolicyInfo.getRouterPolicyWeights();
+ if (newWeights == null || newWeights.size() < 1) {
+ throw new FederationPolicyInitializationException(
+ "Weight vector cannot be null/empty.");
+ }
+ }
+
+
+ /**
+ * Getter method for the configuration weights.
+ *
+ * @return the {@link WeightedPolicyInfo} representing the policy
+ * configuration.
+ */
+ public WeightedPolicyInfo getPolicyInfo() {
+ return policyInfo;
+ }
+
+ /**
+ * Setter method for the configuration weights.
+ *
+ * @param policyInfo the {@link WeightedPolicyInfo} representing the policy
+ * configuration.
+ */
+ public void setPolicyInfo(
+ WeightedPolicyInfo policyInfo) {
+ this.policyInfo = policyInfo;
+ }
+
+ /**
+ * Getter method for the {@link FederationPolicyInitializationContext}.
+ * @return the context for this policy.
+ */
+ public FederationPolicyInitializationContext getPolicyContext() {
+ return policyContext;
+ }
+
+ /**
+ * Setter method for the {@link FederationPolicyInitializationContext}.
+ * @param policyContext the context to assign to this policy.
+ */
+ public void setPolicyContext(
+ FederationPolicyInitializationContext policyContext) {
+ this.policyContext = policyContext;
+ }
+
+ /**
+ * This methods gets active subclusters map from the {@code
+ * FederationStateStoreFacade} and validate it not being null/empty.
+ *
+ * @return the map of ids to info for all active subclusters.
+ * @throws YarnException if we can't get the list.
+ */
+ protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
+ .getFederationStateStoreFacade().getSubClusters(true);
+
+ if (activeSubclusters == null || activeSubclusters.size() < 1) {
+ throw new NoActiveSubclustersException(
+ "Zero active subclusters, cannot pick where to send job.");
+ }
+ return activeSubclusters;
+ }
+
+
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
new file mode 100644
index 0000000..e57709f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
+
+import java.util.Map;
+
+/**
+ * This implements a simple load-balancing policy. The policy "weights" are
+ * binary 0/1 values that enable/disable each sub-cluster, and the policy peaks
+ * the sub-cluster with the least load to forward this application.
+ */
+public class LoadBasedRouterPolicy
+ extends BaseWeightedRouterPolicy {
+
+ private static final Log LOG =
+ LogFactory.getLog(LoadBasedRouterPolicy.class);
+
+ @Override
+ public void reinitialize(FederationPolicyInitializationContext
+ federationPolicyContext)
+ throws FederationPolicyInitializationException {
+
+ // remember old policyInfo
+ WeightedPolicyInfo tempPolicy = getPolicyInfo();
+
+ //attempt new initialization
+ super.reinitialize(federationPolicyContext);
+
+ //check extra constraints
+ for (Float weight : getPolicyInfo().getRouterPolicyWeights().values()) {
+ if (weight != 0 && weight != 1) {
+ //reset to old policyInfo if check fails
+ setPolicyInfo(tempPolicy);
+ throw new FederationPolicyInitializationException(
+ this.getClass().getCanonicalName()
+ + " policy expects all weights to be either "
+ + "\"0\" or \"1\"");
+ }
+ }
+ }
+
+ @Override
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext)
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
+ .getRouterPolicyWeights();
+ SubClusterIdInfo chosen = null;
+ long currBestMem = -1;
+ for (Map.Entry<SubClusterId, SubClusterInfo> entry :
+ activeSubclusters
+ .entrySet()) {
+ SubClusterIdInfo id = new SubClusterIdInfo(entry.getKey());
+ if (weights.containsKey(id) && weights.get(id) > 0) {
+ long availableMemory = getAvailableMemory(entry.getValue());
+ if (availableMemory > currBestMem) {
+ currBestMem = availableMemory;
+ chosen = id;
+ }
+ }
+ }
+
+ return chosen.toId();
+ }
+
+ private long getAvailableMemory(SubClusterInfo value)
+ throws YarnException {
+ try {
+ long mem = -1;
+ JSONObject obj = new JSONObject(value.getCapability());
+ mem = obj.getJSONObject("clusterMetrics").getLong("availableMB");
+ return mem;
+ } catch (JSONException j) {
+ throw new YarnException("FederationSubCluserInfo cannot be parsed", j);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
new file mode 100644
index 0000000..a8ac5f7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import java.util.Map;
+
+/**
+ * This implements a policy that interprets "weights" as a ordered list of
+ * preferences among sub-clusters. Highest weight among active subclusters is
+ * chosen.
+ */
+public class PriorityRouterPolicy
+ extends BaseWeightedRouterPolicy {
+
+ private static final Log LOG =
+ LogFactory.getLog(PriorityRouterPolicy.class);
+
+ @Override
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext)
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ // This finds the sub-cluster with the highest weight among the
+ // currently active ones.
+ Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
+ .getRouterPolicyWeights();
+ SubClusterId chosen = null;
+ Float currentBest = Float.MIN_VALUE;
+ for (SubClusterId id : activeSubclusters.keySet()) {
+ SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
+ if (weights.containsKey(idInfo) && weights.get(idInfo) > currentBest) {
+ currentBest = weights.get(idInfo);
+ chosen = id;
+ }
+ }
+
+ return chosen;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
new file mode 100644
index 0000000..1774961
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * This simple policy picks at uniform random among any of the currently active
+ * subclusters. This policy is easy to use and good for testing.
+ *
+ * NOTE: this is "almost" subsumed by the {@code WeightedRandomRouterPolicy}.
+ * Behavior only diverges when there are active sub-clusters that are not part
+ * of the "weights", in which case the {@link UniformRandomRouterPolicy} send
+ * load to them, while {@code WeightedRandomRouterPolicy} does not.
+ */
+public class UniformRandomRouterPolicy extends BaseWeightedRouterPolicy {
+
+ private Random rand;
+
+ public UniformRandomRouterPolicy() {
+ rand = new Random(System.currentTimeMillis());
+ }
+
+ @Override
+ public void reinitialize(
+ FederationPolicyInitializationContext federationPolicyContext)
+ throws FederationPolicyInitializationException {
+ FederationPolicyInitializationContextValidator
+ .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+ //note: this overrides BaseWeighterRouterPolicy and ignores the weights
+
+ setPolicyContext(federationPolicyContext);
+ }
+
+ /**
+ * Simply picks a random active subcluster to start the AM (this does NOT
+ * depend on the weights in the policy).
+ *
+ * @param appSubmissionContext the context for the app being submitted
+ * (ignored).
+ *
+ * @return a randomly chosen subcluster.
+ *
+ * @throws YarnException if there are no active subclusters.
+ */
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext)
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ List<SubClusterId> list =
+ new ArrayList<>(activeSubclusters.keySet());
+ return list.get(rand.nextInt(list.size()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
new file mode 100644
index 0000000..0777677
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * This policy implements a weighted random sample among currently active
+ * sub-clusters.
+ */
+public class WeightedRandomRouterPolicy
+ extends BaseWeightedRouterPolicy {
+
+ private static final Log LOG =
+ LogFactory.getLog(WeightedRandomRouterPolicy.class);
+ private Random rand = new Random(System.currentTimeMillis());
+
+ @Override
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext)
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ // note: we cannot pre-compute the weights, as the set of activeSubcluster
+ // changes dynamically (and this would unfairly spread the load to
+ // sub-clusters adjacent to an inactive one), hence we need to count/scan
+ // the list and based on weight pick the next sub-cluster.
+ Map<SubClusterIdInfo, Float> weights = getPolicyInfo()
+ .getRouterPolicyWeights();
+
+ float totActiveWeight = 0;
+ for(Map.Entry<SubClusterIdInfo, Float> entry : weights.entrySet()){
+ if(entry.getKey()!=null && activeSubclusters.containsKey(entry.getKey()
+ .toId())){
+ totActiveWeight += entry.getValue();
+ }
+ }
+ float lookupValue = rand.nextFloat() * totActiveWeight;
+
+ for (SubClusterId id : activeSubclusters.keySet()) {
+ SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
+ if (weights.containsKey(idInfo)) {
+ lookupValue -= weights.get(idInfo);
+ }
+ if (lookupValue <= 0) {
+ return id;
+ }
+ }
+ //should never happen
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java
new file mode 100644
index 0000000..e2260a1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterIdInfo.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.store.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * This class represent a sub-cluster identifier in the JSON representation
+ * of the policy configuration.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+@XmlRootElement(name = "federation-policy")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class SubClusterIdInfo {
+
+ private String id;
+
+ public SubClusterIdInfo() {
+ //JAXB needs this
+ }
+
+ public SubClusterIdInfo(String subClusterId) {
+ this.id = subClusterId;
+ }
+
+ public SubClusterIdInfo(SubClusterId subClusterId) {
+ this.id = subClusterId.getId();
+ }
+
+ /**
+ * Get the sub-cluster identifier as {@link SubClusterId}.
+ * @return the sub-cluster id.
+ */
+ public SubClusterId toId() {
+ return SubClusterId.newInstance(id);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof SubClusterIdInfo) {
+ if (((SubClusterIdInfo) other).id.equals(this.id)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
new file mode 100644
index 0000000..8da92b9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import static org.mockito.Mockito.mock;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Test;
+
+/**
+ * Base class for policies tests, tests for common reinitialization cases.
+ */
+public abstract class BaseFederationPoliciesTest {
+
+ private ConfigurableFederationPolicy policy;
+ private WeightedPolicyInfo policyInfo;
+ private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
+ private FederationPolicyInitializationContext federationPolicyContext;
+ private ApplicationSubmissionContext applicationSubmissionContext =
+ mock(ApplicationSubmissionContext.class);
+ private Random rand = new Random();
+
+ @Test
+ public void testReinitilialize() throws YarnException {
+ FederationPolicyInitializationContext fpc =
+ new FederationPolicyInitializationContext();
+ ByteBuffer buf = getPolicyInfo().toByteBuffer();
+ fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
+ .newInstance("queue1", getPolicy().getClass().getCanonicalName(), buf));
+ fpc.setFederationSubclusterResolver(
+ FederationPoliciesTestUtil.initResolver());
+ fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
+ getPolicy().reinitialize(fpc);
+ }
+
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testReinitilializeBad1() throws YarnException {
+ getPolicy().reinitialize(null);
+ }
+
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testReinitilializeBad2() throws YarnException {
+ FederationPolicyInitializationContext fpc =
+ new FederationPolicyInitializationContext();
+ getPolicy().reinitialize(fpc);
+ }
+
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testReinitilializeBad3() throws YarnException {
+ FederationPolicyInitializationContext fpc =
+ new FederationPolicyInitializationContext();
+ ByteBuffer buf = mock(ByteBuffer.class);
+ fpc.setSubClusterPolicyConfiguration(SubClusterPolicyConfiguration
+ .newInstance("queue1", "WrongPolicyName", buf));
+ fpc.setFederationSubclusterResolver(
+ FederationPoliciesTestUtil.initResolver());
+ fpc.setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
+ getPolicy().reinitialize(fpc);
+ }
+
+ @Test(expected = NoActiveSubclustersException.class)
+ public void testNoSubclusters() throws YarnException {
+ // empty the activeSubclusters map
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ getPolicyInfo(), new HashMap<>());
+
+ ConfigurableFederationPolicy currentPolicy = getPolicy();
+ if (currentPolicy instanceof FederationRouterPolicy) {
+ ((FederationRouterPolicy) currentPolicy)
+ .getHomeSubcluster(getApplicationSubmissionContext());
+ }
+ }
+
+ public ConfigurableFederationPolicy getPolicy() {
+ return policy;
+ }
+
+ public void setPolicy(ConfigurableFederationPolicy policy) {
+ this.policy = policy;
+ }
+
+ public WeightedPolicyInfo getPolicyInfo() {
+ return policyInfo;
+ }
+
+ public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
+ this.policyInfo = policyInfo;
+ }
+
+ public Map<SubClusterId, SubClusterInfo> getActiveSubclusters() {
+ return activeSubclusters;
+ }
+
+ public void setActiveSubclusters(
+ Map<SubClusterId, SubClusterInfo> activeSubclusters) {
+ this.activeSubclusters = activeSubclusters;
+ }
+
+ public FederationPolicyInitializationContext getFederationPolicyContext() {
+ return federationPolicyContext;
+ }
+
+ public void setFederationPolicyContext(
+ FederationPolicyInitializationContext federationPolicyContext) {
+ this.federationPolicyContext = federationPolicyContext;
+ }
+
+ public ApplicationSubmissionContext getApplicationSubmissionContext() {
+ return applicationSubmissionContext;
+ }
+
+ public void setApplicationSubmissionContext(
+ ApplicationSubmissionContext applicationSubmissionContext) {
+ this.applicationSubmissionContext = applicationSubmissionContext;
+ }
+
+ public Random getRand() {
+ return rand;
+ }
+
+ public void setRand(Random rand) {
+ this.rand = rand;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index 4ec04d5..e840b3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -77,7 +77,7 @@ public class TestFederationPolicyInitializationContextValidator {
@Test(expected = FederationPolicyInitializationException.class)
public void nullConf() throws Exception {
- context.setFederationPolicyConfiguration(null);
+ context.setSubClusterPolicyConfiguration(null);
FederationPolicyInitializationContextValidator.validate(context,
MockPolicyManager.class.getCanonicalName());
}
@@ -96,8 +96,8 @@ public class TestFederationPolicyInitializationContextValidator {
MockPolicyManager.class.getCanonicalName());
}
- private class MockPolicyManager
- implements FederationPolicyWriter, FederationPolicyConfigurator {
+ private class MockPolicyManager implements FederationPolicyManager {
+
@Override
public FederationAMRMProxyPolicy getAMRMPolicy(
FederationPolicyInitializationContext
@@ -123,6 +123,17 @@ public class TestFederationPolicyInitializationContextValidator {
return SubClusterPolicyConfiguration
.newInstance("queue1", this.getClass().getCanonicalName(), buf);
}
+
+ @Override
+ public String getQueue() {
+ return "default";
+ }
+
+ @Override
+ public void setQueue(String queue) {
+
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
new file mode 100644
index 0000000..9e94f72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestLoadBasedRouterPolicy.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple test class for the {@link LoadBasedRouterPolicy}. Test that the
+ * load is properly considered for allocation.
+ */
+public class TestLoadBasedRouterPolicy extends BaseFederationPoliciesTest {
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new LoadBasedRouterPolicy());
+ setPolicyInfo(new WeightedPolicyInfo());
+ Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+ Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+
+ // simulate 20 active subclusters
+ for (int i = 0; i < 20; i++) {
+ SubClusterIdInfo sc =
+ new SubClusterIdInfo(String.format("sc%02d", i));
+ SubClusterInfo federationSubClusterInfo =
+ SubClusterInfo.newInstance(sc.toId(), null, null, null, null, -1,
+ SubClusterState.SC_RUNNING, -1,
+ generateClusterMetricsInfo(i));
+ getActiveSubclusters().put(sc.toId(), federationSubClusterInfo);
+ float weight = getRand().nextInt(2);
+ if (i == 5) {
+ weight = 1.0f;
+ }
+
+ // 5% chance we omit one of the weights
+ if (i <= 5 || getRand().nextFloat() > 0.05f) {
+ routerWeights.put(sc, weight);
+ amrmWeights.put(sc, weight);
+ }
+ }
+ getPolicyInfo().setRouterPolicyWeights(routerWeights);
+ getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ getPolicyInfo(), getActiveSubclusters());
+
+ }
+
+ private String generateClusterMetricsInfo(int id) {
+
+ long mem = 1024 * getRand().nextInt(277 * 100 - 1);
+ //plant a best cluster
+ if (id == 5) {
+ mem = 1024 * 277 * 100;
+ }
+ String clusterMetrics =
+ "{\"clusterMetrics\":{\"appsSubmitted\":65," + "\"appsCompleted\":64,"
+ + "\"appsPending\":0,\"appsRunning\":0,\"appsFailed\":0,"
+ + "\"appsKilled\":1,\"reservedMB\":0,\"availableMB\":" + mem + ","
+ + "\"allocatedMB\":0,\"reservedVirtualCores\":0,"
+ + "\"availableVirtualCores\":2216,\"allocatedVirtualCores\":0,"
+ + "\"containersAllocated\":0,\"containersReserved\":0,"
+ + "\"containersPending\":0,\"totalMB\":28364800,"
+ + "\"totalVirtualCores\":2216,\"totalNodes\":278,\"lostNodes\":1,"
+ + "\"unhealthyNodes\":0,\"decommissionedNodes\":0,"
+ + "\"rebootedNodes\":0,\"activeNodes\":277}}\n";
+
+ return clusterMetrics;
+
+ }
+
+ @Test
+ public void testLoadIsRespected() throws YarnException {
+
+ SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
+ .getHomeSubcluster(getApplicationSubmissionContext());
+
+ // check the "planted" best cluster is chosen
+ Assert.assertEquals("sc05", chosen.getId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
new file mode 100644
index 0000000..ff5175d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestPriorityRouterPolicy.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Simple test class for the {@link PriorityRouterPolicy}. Tests that the
+ * weights are correctly used for ordering the choice of sub-clusters.
+ */
+public class TestPriorityRouterPolicy extends BaseFederationPoliciesTest {
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new PriorityRouterPolicy());
+ setPolicyInfo(new WeightedPolicyInfo());
+ Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+ Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+
+ // simulate 20 subclusters with a 5% chance of being inactive
+ for (int i = 0; i < 20; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+
+ // with 5% omit a subcluster
+ if (getRand().nextFloat() < 0.95f || i == 5) {
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+ float weight = getRand().nextFloat();
+ if (i == 5) {
+ weight = 1.1f; // guaranteed to be the largest.
+ }
+
+ // 5% chance we omit one of the weights
+ if (i <= 5 || getRand().nextFloat() > 0.05f) {
+ routerWeights.put(sc, weight);
+ amrmWeights.put(sc, weight);
+ }
+ }
+ getPolicyInfo().setRouterPolicyWeights(routerWeights);
+ getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ getPolicyInfo(),
+ getActiveSubclusters());
+
+ }
+
+ @Test
+ public void testPickLowestWeight() throws YarnException {
+ SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
+ .getHomeSubcluster(getApplicationSubmissionContext());
+ Assert.assertEquals("sc5", chosen.getId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
new file mode 100644
index 0000000..ac41ab5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestUniformRandomRouterPolicy.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Simple test class for the {@link UniformRandomRouterPolicy}. Tests that one
+ * of the active subcluster is chosen.
+ */
+public class TestUniformRandomRouterPolicy extends BaseFederationPoliciesTest {
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new UniformRandomRouterPolicy());
+ // needed for base test to work
+ setPolicyInfo(mock(WeightedPolicyInfo.class));
+ for (int i = 1; i <= 2; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ mock(WeightedPolicyInfo.class), getActiveSubclusters());
+ }
+
+ @Test
+ public void testOneSubclusterIsChosen() throws YarnException {
+ SubClusterId chosen = ((FederationRouterPolicy) getPolicy())
+ .getHomeSubcluster(getApplicationSubmissionContext());
+ Assert.assertTrue(getActiveSubclusters().keySet().contains(chosen));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94a87a4d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
new file mode 100644
index 0000000..a612685
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/router/TestWeightedRandomRouterPolicy.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.BaseFederationPoliciesTest;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Simple test class for the {@link WeightedRandomRouterPolicy}. Generate large
+ * number of randomized tests to check we are weighiting correctly even if
+ * clusters go inactive.
+ */
+public class TestWeightedRandomRouterPolicy extends BaseFederationPoliciesTest {
+
+ @Before
+ public void setUp() throws Exception {
+ setPolicy(new WeightedRandomRouterPolicy());
+ setPolicyInfo(new WeightedPolicyInfo());
+ Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+ Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+
+ // simulate 20 subclusters with a 5% chance of being inactive
+ for (int i = 0; i < 20; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+ // with 5% omit a subcluster
+ if (getRand().nextFloat() < 0.95f) {
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+ // 5% chance we omit one of the weights
+ float weight = getRand().nextFloat();
+ if (i <= 5 || getRand().nextFloat() > 0.05f) {
+ routerWeights.put(sc, weight);
+ amrmWeights.put(sc, weight);
+ }
+ }
+ getPolicyInfo().setRouterPolicyWeights(routerWeights);
+ getPolicyInfo().setAMRMPolicyWeights(amrmWeights);
+
+ FederationPoliciesTestUtil.initializePolicyContext(getPolicy(),
+ getPolicyInfo(),
+ getActiveSubclusters());
+
+ }
+
+ @Test
+ public void testClusterChosenWithRightProbability() throws YarnException {
+
+ Map<SubClusterId, AtomicLong> counter = new HashMap<>();
+ for (SubClusterIdInfo id : getPolicyInfo().getRouterPolicyWeights()
+ .keySet()) {
+ counter.put(id.toId(), new AtomicLong(0));
+ }
+
+ float numberOfDraws = 1000000;
+
+ for (float i = 0; i < numberOfDraws; i++) {
+ SubClusterId chosenId = ((FederationRouterPolicy) getPolicy()).
+ getHomeSubcluster(getApplicationSubmissionContext());
+ counter.get(chosenId).incrementAndGet();
+ }
+
+ float totalActiveWeight = 0;
+ for (SubClusterId id : getActiveSubclusters().keySet()) {
+ SubClusterIdInfo idInfo = new SubClusterIdInfo(id);
+ if (getPolicyInfo().getRouterPolicyWeights().containsKey(idInfo)) {
+ totalActiveWeight +=
+ getPolicyInfo().getRouterPolicyWeights().get(idInfo);
+ }
+ }
+
+ for (Map.Entry<SubClusterId, AtomicLong> counterEntry : counter
+ .entrySet()) {
+ float expectedWeight = getPolicyInfo().getRouterPolicyWeights()
+ .get(new SubClusterIdInfo(counterEntry.getKey())) / totalActiveWeight;
+ float actualWeight = counterEntry.getValue().floatValue() / numberOfDraws;
+
+ // make sure that the weights is respected among active subclusters
+ // and no jobs are routed to inactive subclusters.
+ if (getActiveSubclusters().containsKey(counterEntry.getKey())) {
+ Assert.assertTrue(
+ "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ + " expected weight: " + expectedWeight, expectedWeight == 0 ||
+ (actualWeight / expectedWeight) < 1.1
+ && (actualWeight / expectedWeight) > 0.9);
+ } else {
+ Assert.assertTrue(
+ "Id " + counterEntry.getKey() + " Actual weight: " + actualWeight
+ + " expected weight: " + expectedWeight, actualWeight == 0);
+
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org