You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/02/24 19:34:50 UTC
[23/50] [abbrv] hadoop git commit: YARN-5325. Stateless ARMRMProxy
policies implementation. (Carlo Curino via Subru).
YARN-5325. Stateless ARMRMProxy policies implementation. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1793757d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1793757d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1793757d
Branch: refs/heads/YARN-2915
Commit: 1793757ddb07118d0c007b84db18b05b3cee25c6
Parents: 5f03e0f
Author: Subru Krishnan <su...@apache.org>
Authored: Thu Oct 13 17:59:13 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri Feb 24 11:31:44 2017 -0800
----------------------------------------------------------------------
.../AbstractConfigurableFederationPolicy.java | 155 +++++
.../policies/ConfigurableFederationPolicy.java | 9 +-
.../FederationPolicyInitializationContext.java | 37 +-
...ionPolicyInitializationContextValidator.java | 28 +-
.../policies/FederationPolicyManager.java | 59 +-
.../amrmproxy/AbstractAMRMProxyPolicy.java | 47 ++
.../amrmproxy/BroadcastAMRMProxyPolicy.java | 85 +++
.../amrmproxy/FederationAMRMProxyPolicy.java | 25 +-
.../LocalityMulticastAMRMProxyPolicy.java | 583 +++++++++++++++++++
.../policies/amrmproxy/package-info.java | 1 -
.../policies/dao/WeightedPolicyInfo.java | 180 +++---
.../federation/policies/dao/package-info.java | 1 -
.../policies/exceptions/package-info.java | 1 -
.../federation/policies/package-info.java | 1 -
.../policies/router/AbstractRouterPolicy.java | 47 ++
.../router/BaseWeightedRouterPolicy.java | 150 -----
.../policies/router/FederationRouterPolicy.java | 5 +-
.../policies/router/LoadBasedRouterPolicy.java | 36 +-
.../policies/router/PriorityRouterPolicy.java | 19 +-
.../router/UniformRandomRouterPolicy.java | 28 +-
.../router/WeightedRandomRouterPolicy.java | 32 +-
.../policies/router/package-info.java | 1 -
.../resolver/AbstractSubClusterResolver.java | 4 +-
.../policies/BaseFederationPoliciesTest.java | 28 +-
...ionPolicyInitializationContextValidator.java | 25 +-
.../TestBroadcastAMRMProxyFederationPolicy.java | 112 ++++
.../TestLocalityMulticastAMRMProxyPolicy.java | 566 ++++++++++++++++++
.../router/TestLoadBasedRouterPolicy.java | 18 +-
.../router/TestPriorityRouterPolicy.java | 15 +-
.../router/TestWeightedRandomRouterPolicy.java | 35 +-
.../utils/FederationPoliciesTestUtil.java | 64 ++
.../src/test/resources/nodes | 6 +-
32 files changed, 1950 insertions(+), 453 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
new file mode 100644
index 0000000..4cb9bbe
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractConfigurableFederationPolicy.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * Base abstract class for a weighted {@link ConfigurableFederationPolicy}.
+ */
+public abstract class AbstractConfigurableFederationPolicy
+ implements ConfigurableFederationPolicy {
+
+ private WeightedPolicyInfo policyInfo = null;
+ private FederationPolicyInitializationContext policyContext;
+ private boolean isDirty;
+
+ public AbstractConfigurableFederationPolicy() {
+ }
+
+ @Override
+ public void reinitialize(
+ FederationPolicyInitializationContext initializationContext)
+ throws FederationPolicyInitializationException {
+ isDirty = true;
+ FederationPolicyInitializationContextValidator
+ .validate(initializationContext, this.getClass().getCanonicalName());
+
+ // perform consistency checks
+ WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo.fromByteBuffer(
+ initializationContext.getSubClusterPolicyConfiguration().getParams());
+
+ // if nothing has changed skip the rest of initialization
+ // and signal to childs that the reinit is free via isDirty var.
+ if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
+ isDirty = false;
+ return;
+ }
+
+ validate(newPolicyInfo);
+ setPolicyInfo(newPolicyInfo);
+ this.policyContext = initializationContext;
+ }
+
+ /**
+ * Overridable validation step for the policy configuration.
+ *
+ * @param newPolicyInfo the configuration to test.
+ *
+ * @throws FederationPolicyInitializationException if the configuration is not
+ * valid.
+ */
+ public void validate(WeightedPolicyInfo newPolicyInfo)
+ throws FederationPolicyInitializationException {
+ if (newPolicyInfo == null) {
+ throw new FederationPolicyInitializationException(
+ "The policy to " + "validate should not be null.");
+ }
+ }
+
+ /**
+ * Returns true whether the last reinitialization requires actual changes, or
+ * was "free" as the weights have not changed. This is used by subclasses
+ * overriding reinitialize and calling super.reinitialize() to know wheter to
+ * quit early.
+ *
+ * @return whether more work is needed to initialize.
+ */
+ public boolean getIsDirty() {
+ return isDirty;
+ }
+
+ /**
+ * Getter method for the configuration weights.
+ *
+ * @return the {@link WeightedPolicyInfo} representing the policy
+ * configuration.
+ */
+ public WeightedPolicyInfo getPolicyInfo() {
+ return policyInfo;
+ }
+
+ /**
+ * Setter method for the configuration weights.
+ *
+ * @param policyInfo the {@link WeightedPolicyInfo} representing the policy
+ * configuration.
+ */
+ public void setPolicyInfo(WeightedPolicyInfo policyInfo) {
+ this.policyInfo = policyInfo;
+ }
+
+ /**
+ * Getter method for the {@link FederationPolicyInitializationContext}.
+ *
+ * @return the context for this policy.
+ */
+ public FederationPolicyInitializationContext getPolicyContext() {
+ return policyContext;
+ }
+
+ /**
+ * Setter method for the {@link FederationPolicyInitializationContext}.
+ *
+ * @param policyContext the context to assign to this policy.
+ */
+ public void setPolicyContext(
+ FederationPolicyInitializationContext policyContext) {
+ this.policyContext = policyContext;
+ }
+
+ /**
+ * This methods gets active subclusters map from the {@code
+ * FederationStateStoreFacade} and validate it not being null/empty.
+ *
+ * @return the map of ids to info for all active subclusters.
+ *
+ * @throws YarnException if we can't get the list.
+ */
+ protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
+ throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getPolicyContext().getFederationStateStoreFacade().getSubClusters(true);
+
+ if (activeSubclusters == null || activeSubclusters.size() < 1) {
+ throw new NoActiveSubclustersException(
+ "Zero active subclusters, cannot pick where to send job.");
+ }
+ return activeSubclusters;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
index fd6ceea..5245772 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/ConfigurableFederationPolicy.java
@@ -31,14 +31,11 @@ public interface ConfigurableFederationPolicy {
* policies. The implementor should provide try-n-swap semantics, and retain
* state if possible.
*
- * @param federationPolicyInitializationContext the new context to provide to
- * implementor.
+ * @param policyContext the new context to provide to implementor.
*
* @throws FederationPolicyInitializationException in case the initialization
- * fails.
+ * fails.
*/
- void reinitialize(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext)
+ void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
index 9347fd0..46dd6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.federation.policies;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -30,6 +31,7 @@ public class FederationPolicyInitializationContext {
private SubClusterPolicyConfiguration federationPolicyConfiguration;
private SubClusterResolver federationSubclusterResolver;
private FederationStateStoreFacade federationStateStoreFacade;
+ private SubClusterId homeSubcluster;
public FederationPolicyInitializationContext() {
federationPolicyConfiguration = null;
@@ -37,20 +39,19 @@ public class FederationPolicyInitializationContext {
federationStateStoreFacade = null;
}
- public FederationPolicyInitializationContext(SubClusterPolicyConfiguration
- policy, SubClusterResolver resolver, FederationStateStoreFacade
- storeFacade) {
+ public FederationPolicyInitializationContext(
+ SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
+ FederationStateStoreFacade storeFacade) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
}
-
/**
* Getter for the {@link SubClusterPolicyConfiguration}.
*
* @return the {@link SubClusterPolicyConfiguration} to be used for
- * initialization.
+ * initialization.
*/
public SubClusterPolicyConfiguration getSubClusterPolicyConfiguration() {
return federationPolicyConfiguration;
@@ -59,8 +60,8 @@ public class FederationPolicyInitializationContext {
/**
* Setter for the {@link SubClusterPolicyConfiguration}.
*
- * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration}
- * to be used for initialization.
+ * @param fedPolicyConfiguration the {@link SubClusterPolicyConfiguration} to
+ * be used for initialization.
*/
public void setSubClusterPolicyConfiguration(
SubClusterPolicyConfiguration fedPolicyConfiguration) {
@@ -80,7 +81,7 @@ public class FederationPolicyInitializationContext {
* Setter for the {@link SubClusterResolver}.
*
* @param federationSubclusterResolver the {@link SubClusterResolver} to be
- * used for initialization.
+ * used for initialization.
*/
public void setFederationSubclusterResolver(
SubClusterResolver federationSubclusterResolver) {
@@ -105,4 +106,24 @@ public class FederationPolicyInitializationContext {
FederationStateStoreFacade federationStateStoreFacade) {
this.federationStateStoreFacade = federationStateStoreFacade;
}
+
+ /**
+ * Returns the current home sub-cluster. Useful for default policy behaviors.
+ *
+ * @return the home sub-cluster.
+ */
+ public SubClusterId getHomeSubcluster() {
+ return homeSubcluster;
+ }
+
+ /**
+ * Sets in the context the home sub-cluster. Useful for default policy
+ * behaviors.
+ *
+ * @param homeSubcluster value to set.
+ */
+ public void setHomeSubcluster(SubClusterId homeSubcluster) {
+ this.homeSubcluster = homeSubcluster;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
index 31f83d4..1b83bbc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContextValidator.java
@@ -25,50 +25,44 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
public final class FederationPolicyInitializationContextValidator {
private FederationPolicyInitializationContextValidator() {
- //disable constructor per checkstyle
+ // disable constructor per checkstyle
}
public static void validate(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
- String myType) throws FederationPolicyInitializationException {
+ FederationPolicyInitializationContext policyContext, String myType)
+ throws FederationPolicyInitializationException {
if (myType == null) {
- throw new FederationPolicyInitializationException("The myType parameter"
- + " should not be null.");
+ throw new FederationPolicyInitializationException(
+ "The myType parameter" + " should not be null.");
}
- if (federationPolicyInitializationContext == null) {
+ if (policyContext == null) {
throw new FederationPolicyInitializationException(
"The FederationPolicyInitializationContext provided is null. Cannot"
- + " reinitalize "
- + "successfully.");
+ + " reinitalize " + "successfully.");
}
- if (federationPolicyInitializationContext.getFederationStateStoreFacade()
- == null) {
+ if (policyContext.getFederationStateStoreFacade() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacade provided is null. Cannot"
+ " reinitalize successfully.");
}
- if (federationPolicyInitializationContext.getFederationSubclusterResolver()
- == null) {
+ if (policyContext.getFederationSubclusterResolver() == null) {
throw new FederationPolicyInitializationException(
"The FederationStateStoreFacase provided is null. Cannot"
+ " reinitalize successfully.");
}
- if (federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
- == null) {
+ if (policyContext.getSubClusterPolicyConfiguration() == null) {
throw new FederationPolicyInitializationException(
"The FederationSubclusterResolver provided is null. Cannot "
+ "reinitalize successfully.");
}
String intendedType =
- federationPolicyInitializationContext.getSubClusterPolicyConfiguration()
- .getType();
+ policyContext.getSubClusterPolicyConfiguration().getType();
if (!myType.equals(intendedType)) {
throw new FederationPolicyInitializationException(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
index e5dba63..39fdba3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
@@ -25,19 +25,19 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyCo
/**
*
* Implementors need to provide the ability to serliaze a policy and its
- * configuration as a {@link SubClusterPolicyConfiguration}, as well as
- * provide (re)initialization mechanics for the underlying
+ * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
+ * (re)initialization mechanics for the underlying
* {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
*
- * The serialization aspects are used by admin APIs or a policy engine to
- * store a serialized configuration in the {@code FederationStateStore},
- * while the getters methods are used to obtain a propertly inizialized
- * policy in the {@code Router} and {@code AMRMProxy} respectively.
+ * The serialization aspects are used by admin APIs or a policy engine to store
+ * a serialized configuration in the {@code FederationStateStore}, while the
+ * getters methods are used to obtain a propertly inizialized policy in the
+ * {@code Router} and {@code AMRMProxy} respectively.
*
- * This interface by design binds together
- * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy} and
- * provide lifecycle support for serialization and deserialization, to reduce
- * configuration mistakes (combining incompatible policies).
+ * This interface by design binds together {@link FederationAMRMProxyPolicy} and
+ * {@link FederationRouterPolicy} and provide lifecycle support for
+ * serialization and deserialization, to reduce configuration mistakes
+ * (combining incompatible policies).
*
*/
public interface FederationPolicyManager {
@@ -50,23 +50,17 @@ public interface FederationPolicyManager {
* the implementors should attempt to reinitalize (retaining state). To affect
* a complete policy reset oldInstance should be null.
*
- * @param federationPolicyInitializationContext the current context
- * @param oldInstance the existing (possibly null)
- * instance.
+ * @param policyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
*
- * @return an updated {@link FederationAMRMProxyPolicy
- }.
+ * @return an updated {@link FederationAMRMProxyPolicy }.
*
* @throws FederationPolicyInitializationException if the initialization
- * cannot be completed
- * properly. The oldInstance
- * should be still valid in
- * case of failed
- * initialization.
+ * cannot be completed properly. The oldInstance should be still
+ * valid in case of failed initialization.
*/
FederationAMRMProxyPolicy getAMRMPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
+ FederationPolicyInitializationContext policyContext,
FederationAMRMProxyPolicy oldInstance)
throws FederationPolicyInitializationException;
@@ -78,21 +72,17 @@ public interface FederationPolicyManager {
* implementors should attempt to reinitalize (retaining state). To affect a
* complete policy reset oldInstance shoulb be set to null.
*
- * @param federationPolicyInitializationContext the current context
- * @param oldInstance the existing (possibly null)
- * instance.
+ * @param policyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
*
* @return an updated {@link FederationRouterPolicy}.
*
* @throws FederationPolicyInitializationException if the initalization cannot
- * be completed properly. The
- * oldInstance should be still
- * valid in case of failed
- * initialization.
+ * be completed properly. The oldInstance should be still valid in
+ * case of failed initialization.
*/
FederationRouterPolicy getRouterPolicy(
- FederationPolicyInitializationContext
- federationPolicyInitializationContext,
+ FederationPolicyInitializationContext policyContext,
FederationRouterPolicy oldInstance)
throws FederationPolicyInitializationException;
@@ -102,23 +92,24 @@ public interface FederationPolicyManager {
* store.
*
* @return a valid policy configuration representing this object
- * parametrization.
+ * parametrization.
*
* @throws FederationPolicyInitializationException if the current state cannot
- * be serialized properly
+ * be serialized properly
*/
SubClusterPolicyConfiguration serializeConf()
throws FederationPolicyInitializationException;
-
/**
* This method returns the queue this policy is configured for.
+ *
* @return the name of the queue.
*/
String getQueue();
/**
* This methods provides a setter for the queue this policy is specified for.
+ *
* @param queue the name of the queue.
*/
void setQueue(String queue);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
new file mode 100644
index 0000000..e853744
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/AbstractAMRMProxyPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * Base abstract class for {@link FederationAMRMProxyPolicy} implementations,
+ * that provides common validation for reinitialization.
+ */
+public abstract class AbstractAMRMProxyPolicy extends
+ AbstractConfigurableFederationPolicy implements FederationAMRMProxyPolicy {
+
+ @Override
+ public void validate(WeightedPolicyInfo newPolicyInfo)
+ throws FederationPolicyInitializationException {
+ super.validate(newPolicyInfo);
+ Map<SubClusterIdInfo, Float> newWeights =
+ newPolicyInfo.getAMRMPolicyWeights();
+ if (newWeights == null || newWeights.size() < 1) {
+ throw new FederationPolicyInitializationException(
+ "Weight vector cannot be null/empty.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
new file mode 100644
index 0000000..679f4d5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/BroadcastAMRMProxyPolicy.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.UnknownSubclusterException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * An implementation of the {@link FederationAMRMProxyPolicy} that simply
+ * broadcasts each {@link ResourceRequest} to all the available sub-clusters.
+ */
+public class BroadcastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+
+ private Set<SubClusterId> knownClusterIds = new HashSet<>();
+
+ @Override
+ public void reinitialize(
+ FederationPolicyInitializationContext policyContext)
+ throws FederationPolicyInitializationException {
+ // overrides initialize to avoid weight checks that do no apply for
+ // this policy.
+ FederationPolicyInitializationContextValidator
+ .validate(policyContext, this.getClass().getCanonicalName());
+ setPolicyContext(policyContext);
+ }
+
+ @Override
+ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+ List<ResourceRequest> resourceRequests) throws YarnException {
+
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ Map<SubClusterId, List<ResourceRequest>> answer = new HashMap<>();
+
+ // simply broadcast the resource request to all sub-clusters
+ for (SubClusterId subClusterId : activeSubclusters.keySet()) {
+ answer.put(subClusterId, resourceRequests);
+ knownClusterIds.add(subClusterId);
+ }
+
+ return answer;
+ }
+
+ @Override
+ public void notifyOfResponse(SubClusterId subClusterId,
+ AllocateResponse response) throws YarnException {
+ if (!knownClusterIds.contains(subClusterId)) {
+ throw new UnknownSubclusterException(
+ "The response is received from a subcluster that is unknown to this "
+ + "policy.");
+ }
+ // stateless policy does not care about responses
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
index 4a3305c..0541df4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/FederationAMRMProxyPolicy.java
@@ -17,18 +17,18 @@
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+import java.util.List;
+import java.util.Map;
+
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import java.util.List;
-import java.util.Map;
-
/**
- * Implementors of this interface provide logic to split the list of {@link
- * ResourceRequest}s received by the AM among various RMs.
+ * Implementors of this interface provide logic to split the list of
+ * {@link ResourceRequest}s received by the AM among various RMs.
*/
public interface FederationAMRMProxyPolicy
extends ConfigurableFederationPolicy {
@@ -37,18 +37,17 @@ public interface FederationAMRMProxyPolicy
* Splits the {@link ResourceRequest}s from the client across one or more
* sub-clusters based on the policy semantics (e.g., broadcast, load-based).
*
- * @param resourceRequests the list of {@link ResourceRequest}s from the
- * AM to be split
+ * @param resourceRequests the list of {@link ResourceRequest}s from the AM to
+ * be split
*
* @return map of sub-cluster as identified by {@link SubClusterId} to the
- * list of {@link ResourceRequest}s that should be forwarded to it
+ * list of {@link ResourceRequest}s that should be forwarded to it
*
* @throws YarnException in case the request is malformed or no viable
- * sub-clusters can be found.
+ * sub-clusters can be found.
*/
Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
- List<ResourceRequest> resourceRequests)
- throws YarnException;
+ List<ResourceRequest> resourceRequests) throws YarnException;
/**
* This method should be invoked to notify the policy about responses being
@@ -60,7 +59,7 @@ public interface FederationAMRMProxyPolicy
*
* @throws YarnException in case the response is not valid
*/
- void notifyOfResponse(SubClusterId subClusterId,
- AllocateResponse response) throws YarnException;
+ void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response)
+ throws YarnException;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
new file mode 100644
index 0000000..283f89e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java
@@ -0,0 +1,583 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
+import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of the {@link FederationAMRMProxyPolicy} interface that
+ * carefully multicasts the requests with the following behavior:
+ *
+ * <p>
+ * Host localized {@link ResourceRequest}s are always forwarded to the RM that
+ * owns the corresponding node, based on the feedback of a
+ * {@link SubClusterResolver}. If the {@link SubClusterResolver} cannot resolve
+ * this node we default to forwarding the {@link ResourceRequest} to the home
+ * sub-cluster.
+ * </p>
+ *
+ * <p>
+ * Rack localized {@link ResourceRequest}s are forwarded to the RMs that owns
+ * the corresponding rack. Note that in some deployments each rack could be
+ * striped across multiple RMs. Thsi policy respects that. If the
+ * {@link SubClusterResolver} cannot resolve this rack we default to forwarding
+ * the {@link ResourceRequest} to the home sub-cluster.
+ * </p>
+ *
+ * <p>
+ * ANY requests corresponding to node/rack local requests are forwarded only to
+ * the set of RMs that owns the corresponding localized requests. The number of
+ * containers listed in each ANY is proportional to the number of localized
+ * container requests (associated to this ANY via the same allocateRequestId).
+ * </p>
+ *
+ * <p>
+ * ANY that are not associated to node/rack local requests are split among RMs
+ * based on the "weights" in the {@link WeightedPolicyInfo} configuration *and*
+ * headroom information. The {@code headroomAlpha} parameter of the policy
+ * configuration indicates how much headroom contributes to the splitting
+ * choice. Value of 1.0f indicates the weights are interpreted only as 0/1
+ * boolean but all splitting is based on the advertised headroom (fallback to
+ * 1/N for RMs that we don't have headroom info from). An {@code headroomAlpha}
+ * value of 0.0f means headroom is ignored and all splitting decisions are
+ * proportional to the "weights" in the configuration of the policy.
+ * </p>
+ *
+ * <p>
+ * ANY of zero size are forwarded to all known subclusters (i.e., subclusters
+ * where we scheduled containers before), as they may represent a user attempt
+ * to cancel a previous request (and we are mostly stateless now, so should
+ * forward to all known RMs).
+ * </p>
+ *
+ * <p>
+ * Invariants:
+ * </p>
+ *
+ * <p>
+ * The policy always excludes non-active RMs.
+ * </p>
+ *
+ * <p>
+ * The policy always excludes RMs that do not appear in the policy configuration
+ * weights, or have a weight of 0 (even if localized resources explicit refer to
+ * it).
+ * </p>
+ *
+ * <p>
+ * (Bar rounding to closest ceiling of fractional containers) The sum of
+ * requests made to multiple RMs at the ANY level "adds-up" to the user request.
+ * The maximum possible excess in a given request is a number of containers less
+ * or equal to number of sub-clusters in the federation.
+ * </p>
+ */
+public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
+
+ private Map<SubClusterId, Float> weights;
+ private SubClusterResolver resolver;
+
+ private Map<SubClusterId, Resource> headroom;
+ private float hrAlpha;
+ private FederationStateStoreFacade federationFacade;
+ private AllocationBookkeeper bookkeeper;
+ private SubClusterId homeSubcluster;
+
+ @Override
+ public void reinitialize(
+ FederationPolicyInitializationContext policyContext)
+ throws FederationPolicyInitializationException {
+
+ // save reference to old weights
+ WeightedPolicyInfo tempPolicy = getPolicyInfo();
+
+ super.reinitialize(policyContext);
+ if (!getIsDirty()) {
+ return;
+ }
+
+ Map<SubClusterId, Float> newWeightsConverted = new HashMap<>();
+ boolean allInactive = true;
+ WeightedPolicyInfo policy = getPolicyInfo();
+ if (policy.getAMRMPolicyWeights() == null
+ || policy.getAMRMPolicyWeights().size() == 0) {
+ allInactive = false;
+ } else {
+ for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights()
+ .entrySet()) {
+ if (e.getValue() > 0) {
+ allInactive = false;
+ }
+ newWeightsConverted.put(e.getKey().toId(), e.getValue());
+ }
+ }
+ if (allInactive) {
+ // reset the policyInfo and throw
+ setPolicyInfo(tempPolicy);
+ throw new FederationPolicyInitializationException(
+ "The weights used to configure "
+ + "this policy are all set to zero! (no ResourceRequest could be "
+ + "forwarded with this setting.)");
+ }
+
+ if (policyContext.getHomeSubcluster() == null) {
+ setPolicyInfo(tempPolicy);
+ throw new FederationPolicyInitializationException("The homeSubcluster "
+ + "filed in the context must be initialized to use this policy");
+ }
+
+ weights = newWeightsConverted;
+ resolver = policyContext.getFederationSubclusterResolver();
+
+ if (headroom == null) {
+ headroom = new ConcurrentHashMap<>();
+ }
+ hrAlpha = policy.getHeadroomAlpha();
+
+ this.federationFacade =
+ policyContext.getFederationStateStoreFacade();
+ this.bookkeeper = new AllocationBookkeeper();
+ this.homeSubcluster = policyContext.getHomeSubcluster();
+
+ }
+
+ @Override
+ public void notifyOfResponse(SubClusterId subClusterId,
+ AllocateResponse response) throws YarnException {
+ // stateless policy does not care about responses except tracking headroom
+ headroom.put(subClusterId, response.getAvailableResources());
+ }
+
+ @Override
+ public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(
+ List<ResourceRequest> resourceRequests) throws YarnException {
+
+ // object used to accumulate statistics about the answer, initialize with
+ // active subclusters.
+ bookkeeper.reinitialize(federationFacade.getSubClusters(true));
+
+ List<ResourceRequest> nonLocalizedRequests =
+ new ArrayList<ResourceRequest>();
+
+ SubClusterId targetId = null;
+ Set<SubClusterId> targetIds = null;
+
+ // if the RR is resolved to a local subcluster add it directly (node and
+ // resolvable racks)
+ for (ResourceRequest rr : resourceRequests) {
+ targetId = null;
+ targetIds = null;
+
+ // Handle: ANY (accumulated for later)
+ if (ResourceRequest.isAnyLocation(rr.getResourceName())) {
+ nonLocalizedRequests.add(rr);
+ continue;
+ }
+
+ // Handle "node" requests
+ try {
+ targetId = resolver.getSubClusterForNode(rr.getResourceName());
+ } catch (YarnException e) {
+ // this might happen as we can't differentiate node from rack names
+ // we log altogether later
+ }
+ if (bookkeeper.isActiveAndEnabled(targetId)) {
+ bookkeeper.addLocalizedNodeRR(targetId, rr);
+ continue;
+ }
+
+ // Handle "rack" requests
+ try {
+ targetIds = resolver.getSubClustersForRack(rr.getResourceName());
+ } catch (YarnException e) {
+ // this might happen as we can't differentiate node from rack names
+ // we log altogether later
+ }
+ if (targetIds != null && targetIds.size() > 0) {
+ for (SubClusterId tid : targetIds) {
+ if (bookkeeper.isActiveAndEnabled(tid)) {
+ bookkeeper.addRackRR(tid, rr);
+ }
+ }
+ continue;
+ }
+
+ // Handle node/rack requests that the SubClusterResolver cannot map to
+ // any cluster. Defaulting to home subcluster.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ERROR resolving sub-cluster for resourceName: "
+ + rr.getResourceName() + " we are falling back to homeSubCluster:"
+ + homeSubcluster);
+ }
+
+ // If home-subcluster is not active, ignore node/rack request
+ if (bookkeeper.isActiveAndEnabled(homeSubcluster)) {
+ bookkeeper.addLocalizedNodeRR(homeSubcluster, rr);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("The homeSubCluster (" + homeSubcluster + ") we are "
+ + "defaulting to is not active, the ResourceRequest "
+ + "will be ignored.");
+ }
+ }
+ }
+
+ // handle all non-localized requests (ANY)
+ splitAnyRequests(nonLocalizedRequests, bookkeeper);
+
+ return bookkeeper.getAnswer();
+ }
+
+ /**
+ * It splits a list of non-localized resource requests among sub-clusters.
+ */
+ private void splitAnyRequests(List<ResourceRequest> originalResourceRequests,
+ AllocationBookkeeper allocationBookkeeper) throws YarnException {
+
+ for (ResourceRequest resourceRequest : originalResourceRequests) {
+
+ // FIRST: pick the target set of subclusters (based on whether this RR
+ // is associated with other localized requests via an allocationId)
+ Long allocationId = resourceRequest.getAllocationRequestId();
+ Set<SubClusterId> targetSubclusters;
+ if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
+ targetSubclusters =
+ allocationBookkeeper.getSubClustersForId(allocationId);
+ } else {
+ targetSubclusters = allocationBookkeeper.getActiveAndEnabledSC();
+ }
+
+ // SECOND: pick how much to ask to each RM for each request
+ splitIndividualAny(resourceRequest, targetSubclusters,
+ allocationBookkeeper);
+ }
+ }
+
+ /**
+ * Return a projection of this ANY {@link ResourceRequest} that belongs to
+ * this sub-cluster. This is done based on the "count" of the containers that
+ * require locality in each sublcuster (if any) or based on the "weights" and
+ * headroom.
+ */
+ private void splitIndividualAny(ResourceRequest originalResourceRequest,
+ Set<SubClusterId> targetSubclusters,
+ AllocationBookkeeper allocationBookkeeper) {
+
+ long allocationId = originalResourceRequest.getAllocationRequestId();
+
+ for (SubClusterId targetId : targetSubclusters) {
+ float numContainer = originalResourceRequest.getNumContainers();
+
+ // If the ANY request has 0 containers to begin with we must forward it to
+ // any RM we have previously contacted (this might be the user way
+ // to cancel a previous request).
+ if (numContainer == 0 && headroom.containsKey(targetId)) {
+ allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
+ }
+
+ // If ANY is associated with localized asks, split based on their ratio
+ if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
+ float localityBasedWeight = getLocalityBasedWeighting(allocationId,
+ targetId, allocationBookkeeper);
+ numContainer = numContainer * localityBasedWeight;
+ } else {
+ // split ANY based on load and policy configuration
+ float headroomWeighting =
+ getHeadroomWeighting(targetId, allocationBookkeeper);
+ float policyWeighting =
+ getPolicyConfigWeighting(targetId, allocationBookkeeper);
+ // hrAlpha controls how much headroom influencing decision
+ numContainer = numContainer
+ * (hrAlpha * headroomWeighting + (1 - hrAlpha) * policyWeighting);
+ }
+
+ // if the calculated request is non-empty add it to the answer
+ if (numContainer > 0) {
+ ResourceRequest out =
+ ResourceRequest.newInstance(originalResourceRequest.getPriority(),
+ originalResourceRequest.getResourceName(),
+ originalResourceRequest.getCapability(),
+ originalResourceRequest.getNumContainers(),
+ originalResourceRequest.getRelaxLocality(),
+ originalResourceRequest.getNodeLabelExpression(),
+ originalResourceRequest.getExecutionTypeRequest());
+ out.setAllocationRequestId(allocationId);
+ out.setNumContainers((int) Math.ceil(numContainer));
+ if (out.isAnyLocation(out.getResourceName())) {
+ allocationBookkeeper.addAnyRR(targetId, out);
+ } else {
+ allocationBookkeeper.addRackRR(targetId, out);
+ }
+ }
+ }
+ }
+
+ /**
+ * Compute the weight to assign to a subcluster based on how many local
+ * requests a subcluster is target of.
+ */
+ private float getLocalityBasedWeighting(long reqId, SubClusterId targetId,
+ AllocationBookkeeper allocationBookkeeper) {
+ float totWeight = allocationBookkeeper.getTotNumLocalizedContainers();
+ float localWeight =
+ allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
+ return totWeight > 0 ? localWeight / totWeight : 0;
+ }
+
+ /**
+ * Compute the "weighting" to give to a sublcuster based on the configured
+ * policy weights (for the active subclusters).
+ */
+ private float getPolicyConfigWeighting(SubClusterId targetId,
+ AllocationBookkeeper allocationBookkeeper) {
+ float totWeight = allocationBookkeeper.totPolicyWeight;
+ Float localWeight = weights.get(targetId);
+ return (localWeight != null && totWeight > 0) ? localWeight / totWeight : 0;
+ }
+
+ /**
+ * Compute the weighting based on available headroom. This is proportional to
+ * the available headroom memory announced by RM, or to 1/N for RMs we have
+ * not seen yet. If all RMs report zero headroom, we fallback to 1/N again.
+ */
+ private float getHeadroomWeighting(SubClusterId targetId,
+ AllocationBookkeeper allocationBookkeeper) {
+
+ // baseline weight for all RMs
+ float headroomWeighting =
+ 1 / (float) allocationBookkeeper.getActiveAndEnabledSC().size();
+
+ // if we have headroom infomration for this sub-cluster (and we are safe
+ // from /0 issues)
+ if (headroom.containsKey(targetId)
+ && allocationBookkeeper.totHeadroomMemory > 0) {
+ // compute which portion of the RMs that are active/enabled have reported
+ // their headroom (needed as adjustment factor)
+ // (note: getActiveAndEnabledSC should never be null/zero)
+ float ratioHeadroomKnown = allocationBookkeeper.totHeadRoomEnabledRMs
+ / (float) allocationBookkeeper.getActiveAndEnabledSC().size();
+
+ // headroomWeighting is the ratio of headroom memory in the targetId
+ // cluster / total memory. The ratioHeadroomKnown factor is applied to
+ // adjust for missing information and ensure sum of allocated containers
+ // closely approximate what the user asked (small excess).
+ headroomWeighting = (headroom.get(targetId).getMemorySize()
+ / allocationBookkeeper.totHeadroomMemory) * (ratioHeadroomKnown);
+ }
+ return headroomWeighting;
+ }
+
+ /**
+ * This helper class is used to book-keep the requests made to each
+ * subcluster, and maintain useful statistics to split ANY requests.
+ */
+ private final class AllocationBookkeeper {
+
+ // the answer being accumulated
+ private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<>();
+
+ // stores how many containers we have allocated in each RM for localized
+ // asks, used to correctly "spread" the corresponding ANY
+ private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM =
+ new HashMap<>();
+
+ private Set<SubClusterId> activeAndEnabledSC = new HashSet<>();
+ private long totNumLocalizedContainers = 0;
+ private float totHeadroomMemory = 0;
+ private int totHeadRoomEnabledRMs = 0;
+ private float totPolicyWeight = 0;
+
+ private void reinitialize(
+ Map<SubClusterId, SubClusterInfo> activeSubclusters)
+ throws YarnException {
+
+ // reset data structures
+ answer.clear();
+ countContainersPerRM.clear();
+ activeAndEnabledSC.clear();
+ totNumLocalizedContainers = 0;
+ totHeadroomMemory = 0;
+ totHeadRoomEnabledRMs = 0;
+ totPolicyWeight = 0;
+
+ // pre-compute the set of subclusters that are both active and enabled by
+ // the policy weights, and accumulate their total weight
+ for (Map.Entry<SubClusterId, Float> entry : weights.entrySet()) {
+ if (entry.getValue() > 0
+ && activeSubclusters.containsKey(entry.getKey())) {
+ activeAndEnabledSC.add(entry.getKey());
+ totPolicyWeight += entry.getValue();
+ }
+ }
+
+ if (activeAndEnabledSC.size() < 1) {
+ throw new NoActiveSubclustersException(
+ "None of the subclusters enabled in this policy (weight>0) are "
+ + "currently active we cannot forward the ResourceRequest(s)");
+ }
+
+ // pre-compute headroom-based weights for active/enabled subclusters
+ for (Map.Entry<SubClusterId, Resource> r : headroom.entrySet()) {
+ if (activeAndEnabledSC.contains(r.getKey())) {
+ totHeadroomMemory += r.getValue().getMemorySize();
+ totHeadRoomEnabledRMs++;
+ }
+ }
+
+ }
+
+ /**
+ * Add to the answer a localized node request, and keeps track of statistics
+ * on a per-allocation-id and per-subcluster bases.
+ */
+ private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
+ Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+
+ if (!countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
+ countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap<>());
+ }
+ if (!countContainersPerRM.get(rr.getAllocationRequestId())
+ .containsKey(targetId)) {
+ countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId,
+ new AtomicLong(0));
+ }
+ countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId)
+ .addAndGet(rr.getNumContainers());
+
+ totNumLocalizedContainers += rr.getNumContainers();
+
+ internalAddToAnswer(targetId, rr);
+ }
+
+ /**
+ * Add a rack-local request to the final asnwer.
+ */
+ public void addRackRR(SubClusterId targetId, ResourceRequest rr) {
+ Preconditions.checkArgument(!rr.isAnyLocation(rr.getResourceName()));
+ internalAddToAnswer(targetId, rr);
+ }
+
+ /**
+ * Add an ANY request to the final answer.
+ */
+ private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
+ Preconditions.checkArgument(rr.isAnyLocation(rr.getResourceName()));
+ internalAddToAnswer(targetId, rr);
+ }
+
+ private void internalAddToAnswer(SubClusterId targetId,
+ ResourceRequest partialRR) {
+ if (!answer.containsKey(targetId)) {
+ answer.put(targetId, new ArrayList<ResourceRequest>());
+ }
+ answer.get(targetId).add(partialRR);
+ }
+
+ /**
+ * Return all known subclusters associated with an allocation id.
+ *
+ * @param allocationId the allocation id considered
+ *
+ * @return the list of {@link SubClusterId}s associated with this allocation
+ * id
+ */
+ private Set<SubClusterId> getSubClustersForId(long allocationId) {
+ if (countContainersPerRM.get(allocationId) == null) {
+ return null;
+ }
+ return countContainersPerRM.get(allocationId).keySet();
+ }
+
+ /**
+ * Return the answer accumulated so far.
+ *
+ * @return the answer
+ */
+ private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
+ return answer;
+ }
+
+ /**
+ * Return the set of sub-clusters that are both active and allowed by our
+ * policy (weight > 0).
+ *
+ * @return a set of active and enabled {@link SubClusterId}s
+ */
+ private Set<SubClusterId> getActiveAndEnabledSC() {
+ return activeAndEnabledSC;
+ }
+
+ /**
+ * Return the total number of container coming from localized requests.
+ */
+ private long getTotNumLocalizedContainers() {
+ return totNumLocalizedContainers;
+ }
+
+ /**
+ * Returns the number of containers matching an allocation Id that are
+ * localized in the targetId subcluster.
+ */
+ private long getNumLocalizedContainers(long allocationId,
+ SubClusterId targetId) {
+ AtomicLong c = countContainersPerRM.get(allocationId).get(targetId);
+ return c == null ? 0 : c.get();
+ }
+
+ /**
+ * Returns true is the subcluster request is both active and enabled.
+ */
+ private boolean isActiveAndEnabled(SubClusterId targetId) {
+ if (targetId == null) {
+ return false;
+ } else {
+ return getActiveAndEnabledSC().contains(targetId);
+ }
+ }
+
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
index 99da20b..ef72647 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/package-info.java
@@ -17,4 +17,3 @@
*/
/** AMRMPRoxy policies. **/
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
index a0fa37f..62eb03b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/WeightedPolicyInfo.java
@@ -17,10 +17,19 @@
package org.apache.hadoop.yarn.server.federation.policies.dao;
-import com.sun.jersey.api.json.JSONConfiguration;
-import com.sun.jersey.api.json.JSONJAXBContext;
-import com.sun.jersey.api.json.JSONMarshaller;
-import com.sun.jersey.api.json.JSONUnmarshaller;
+import java.io.StringReader;
+import java.io.StringWriter;
+import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Marshaller;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -29,24 +38,16 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.xml.bind.JAXBException;
-import javax.xml.bind.Marshaller;
-import javax.xml.bind.annotation.XmlAccessType;
-import javax.xml.bind.annotation.XmlAccessorType;
-import javax.xml.bind.annotation.XmlRootElement;
-import java.io.StringReader;
-import java.io.StringWriter;
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.Map;
+import com.sun.jersey.api.json.JSONConfiguration;
+import com.sun.jersey.api.json.JSONJAXBContext;
+import com.sun.jersey.api.json.JSONMarshaller;
+import com.sun.jersey.api.json.JSONUnmarshaller;
/**
* This is a DAO class for the configuration of parameteres for federation
* policies. This generalizes several possible configurations as two lists of
- * {@link SubClusterIdInfo} and corresponding weights as a
- * {@link Float}. The interpretation of the weight is left to the logic in
- * the policy.
+ * {@link SubClusterIdInfo} and corresponding weights as a {@link Float}. The
+ * interpretation of the weight is left to the logic in the policy.
*/
@InterfaceAudience.Private
@@ -57,12 +58,14 @@ public class WeightedPolicyInfo {
private static final Logger LOG =
LoggerFactory.getLogger(WeightedPolicyInfo.class);
-
+ private static JSONJAXBContext jsonjaxbContext = initContext();
private Map<SubClusterIdInfo, Float> routerPolicyWeights = new HashMap<>();
private Map<SubClusterIdInfo, Float> amrmPolicyWeights = new HashMap<>();
private float headroomAlpha;
- private static JSONJAXBContext jsonjaxbContext = initContext();
+ public WeightedPolicyInfo() {
+ // JAXB needs this
+ }
private static JSONJAXBContext initContext() {
try {
@@ -74,46 +77,6 @@ public class WeightedPolicyInfo {
return null;
}
- public WeightedPolicyInfo() {
- //JAXB needs this
- }
-
- /**
- * Setter method for Router weights.
- *
- * @param policyWeights the router weights.
- */
- public void setRouterPolicyWeights(
- Map<SubClusterIdInfo, Float> policyWeights) {
- this.routerPolicyWeights = policyWeights;
- }
-
- /**
- * Setter method for ARMRMProxy weights.
- *
- * @param policyWeights the amrmproxy weights.
- */
- public void setAMRMPolicyWeights(
- Map<SubClusterIdInfo, Float> policyWeights) {
- this.amrmPolicyWeights = policyWeights;
- }
-
- /**
- * Getter of the router weights.
- * @return the router weights.
- */
- public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
- return routerPolicyWeights;
- }
-
- /**
- * Getter for AMRMProxy weights.
- * @return the AMRMProxy weights.
- */
- public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
- return amrmPolicyWeights;
- }
-
/**
* Deserializes a {@link WeightedPolicyInfo} from a byte UTF-8 JSON
* representation.
@@ -123,14 +86,14 @@ public class WeightedPolicyInfo {
* @return the {@link WeightedPolicyInfo} represented.
*
* @throws FederationPolicyInitializationException if a deserializaiton error
- * occurs.
+ * occurs.
*/
public static WeightedPolicyInfo fromByteBuffer(ByteBuffer bb)
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
- throw new FederationPolicyInitializationException("JSONJAXBContext should"
- + " not be null.");
+ throw new FederationPolicyInitializationException(
+ "JSONJAXBContext should" + " not be null.");
}
try {
@@ -139,9 +102,8 @@ public class WeightedPolicyInfo {
bb.get(bytes);
String params = new String(bytes, Charset.forName("UTF-8"));
- WeightedPolicyInfo weightedPolicyInfo = unmarshaller
- .unmarshalFromJSON(new StringReader(params),
- WeightedPolicyInfo.class);
+ WeightedPolicyInfo weightedPolicyInfo = unmarshaller.unmarshalFromJSON(
+ new StringReader(params), WeightedPolicyInfo.class);
return weightedPolicyInfo;
} catch (JAXBException j) {
throw new FederationPolicyInitializationException(j);
@@ -149,19 +111,56 @@ public class WeightedPolicyInfo {
}
/**
- * Converts the policy into a byte array representation in the input {@link
- * ByteBuffer}.
+ * Getter of the router weights.
+ *
+ * @return the router weights.
+ */
+ public Map<SubClusterIdInfo, Float> getRouterPolicyWeights() {
+ return routerPolicyWeights;
+ }
+
+ /**
+ * Setter method for Router weights.
+ *
+ * @param policyWeights the router weights.
+ */
+ public void setRouterPolicyWeights(
+ Map<SubClusterIdInfo, Float> policyWeights) {
+ this.routerPolicyWeights = policyWeights;
+ }
+
+ /**
+ * Getter for AMRMProxy weights.
+ *
+ * @return the AMRMProxy weights.
+ */
+ public Map<SubClusterIdInfo, Float> getAMRMPolicyWeights() {
+ return amrmPolicyWeights;
+ }
+
+ /**
+ * Setter method for ARMRMProxy weights.
+ *
+ * @param policyWeights the amrmproxy weights.
+ */
+ public void setAMRMPolicyWeights(Map<SubClusterIdInfo, Float> policyWeights) {
+ this.amrmPolicyWeights = policyWeights;
+ }
+
+ /**
+ * Converts the policy into a byte array representation in the input
+ * {@link ByteBuffer}.
*
* @return byte array representation of this policy configuration.
*
* @throws FederationPolicyInitializationException if a serialization error
- * occurs.
+ * occurs.
*/
public ByteBuffer toByteBuffer()
throws FederationPolicyInitializationException {
if (jsonjaxbContext == null) {
- throw new FederationPolicyInitializationException("JSONJAXBContext should"
- + " not be null.");
+ throw new FederationPolicyInitializationException(
+ "JSONJAXBContext should" + " not be null.");
}
try {
String s = toJSONString();
@@ -186,22 +185,21 @@ public class WeightedPolicyInfo {
return false;
}
- WeightedPolicyInfo otherPolicy =
- (WeightedPolicyInfo) other;
+ WeightedPolicyInfo otherPolicy = (WeightedPolicyInfo) other;
Map<SubClusterIdInfo, Float> otherAMRMWeights =
otherPolicy.getAMRMPolicyWeights();
Map<SubClusterIdInfo, Float> otherRouterWeights =
otherPolicy.getRouterPolicyWeights();
- boolean amrmWeightsMatch = otherAMRMWeights != null &&
- getAMRMPolicyWeights() != null &&
- CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
- getAMRMPolicyWeights().entrySet());
+ boolean amrmWeightsMatch =
+ otherAMRMWeights != null && getAMRMPolicyWeights() != null
+ && CollectionUtils.isEqualCollection(otherAMRMWeights.entrySet(),
+ getAMRMPolicyWeights().entrySet());
- boolean routerWeightsMatch = otherRouterWeights != null &&
- getRouterPolicyWeights() != null &&
- CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
- getRouterPolicyWeights().entrySet());
+ boolean routerWeightsMatch =
+ otherRouterWeights != null && getRouterPolicyWeights() != null
+ && CollectionUtils.isEqualCollection(otherRouterWeights.entrySet(),
+ getRouterPolicyWeights().entrySet());
return amrmWeightsMatch && routerWeightsMatch;
}
@@ -215,10 +213,10 @@ public class WeightedPolicyInfo {
* Return the parameter headroomAlpha, used by policies that balance
* weight-based and load-based considerations in their decisions.
*
- * For policies that use this parameter, values close to 1 indicate that
- * most of the decision should be based on currently observed headroom from
- * various sub-clusters, values close to zero, indicate that the decision
- * should be mostly based on weights and practically ignore current load.
+ * For policies that use this parameter, values close to 1 indicate that most
+ * of the decision should be based on currently observed headroom from various
+ * sub-clusters, values close to zero, indicate that the decision should be
+ * mostly based on weights and practically ignore current load.
*
* @return the value of headroomAlpha.
*/
@@ -227,13 +225,13 @@ public class WeightedPolicyInfo {
}
/**
- * Set the parameter headroomAlpha, used by policies that balance
- * weight-based and load-based considerations in their decisions.
+ * Set the parameter headroomAlpha, used by policies that balance weight-based
+ * and load-based considerations in their decisions.
*
- * For policies that use this parameter, values close to 1 indicate that
- * most of the decision should be based on currently observed headroom from
- * various sub-clusters, values close to zero, indicate that the decision
- * should be mostly based on weights and practically ignore current load.
+ * For policies that use this parameter, values close to 1 indicate that most
+ * of the decision should be based on currently observed headroom from various
+ * sub-clusters, values close to zero, indicate that the decision should be
+ * mostly based on weights and practically ignore current load.
*
* @param headroomAlpha the value to use for balancing.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
index 43f5b83..c292e52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/dao/package-info.java
@@ -17,4 +17,3 @@
*/
/** DAO objects for serializing/deserializing policy configurations. **/
package org.apache.hadoop.yarn.server.federation.policies.dao;
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
index 3318da9..ad2d543 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/exceptions/package-info.java
@@ -17,4 +17,3 @@
*/
/** Exceptions for policies. **/
package org.apache.hadoop.yarn.server.federation.policies.exceptions;
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
index 7d9a121..fa3fcc5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/package-info.java
@@ -17,4 +17,3 @@
*/
/** Federation Policies. **/
package org.apache.hadoop.yarn.server.federation.policies;
-
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
new file mode 100644
index 0000000..f49af1d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.Map;
+
+import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+
+/**
+ * Base abstract class for {@link FederationRouterPolicy} implementations, that
+ * provides common validation for reinitialization.
+ */
+public abstract class AbstractRouterPolicy extends
+ AbstractConfigurableFederationPolicy implements FederationRouterPolicy {
+
+ @Override
+ public void validate(WeightedPolicyInfo newPolicyInfo)
+ throws FederationPolicyInitializationException {
+ super.validate(newPolicyInfo);
+ Map<SubClusterIdInfo, Float> newWeights =
+ newPolicyInfo.getRouterPolicyWeights();
+ if (newWeights == null || newWeights.size() < 1) {
+ throw new FederationPolicyInitializationException(
+ "Weight vector cannot be null/empty.");
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
deleted file mode 100644
index e888979..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/BaseWeightedRouterPolicy.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies.router;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
-import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
-
-import java.util.Map;
-
-/**
- * Abstract class provides common validation of reinitialize(), for all
- * policies that are "weight-based".
- */
-public abstract class BaseWeightedRouterPolicy
- implements FederationRouterPolicy {
-
- private WeightedPolicyInfo policyInfo = null;
- private FederationPolicyInitializationContext policyContext;
-
- public BaseWeightedRouterPolicy() {
- }
-
- @Override
- public void reinitialize(FederationPolicyInitializationContext
- federationPolicyContext)
- throws FederationPolicyInitializationException {
- FederationPolicyInitializationContextValidator
- .validate(federationPolicyContext, this.getClass().getCanonicalName());
-
- // perform consistency checks
- WeightedPolicyInfo newPolicyInfo = WeightedPolicyInfo
- .fromByteBuffer(
- federationPolicyContext.getSubClusterPolicyConfiguration()
- .getParams());
-
- // if nothing has changed skip the rest of initialization
- if (policyInfo != null && policyInfo.equals(newPolicyInfo)) {
- return;
- }
-
- validate(newPolicyInfo);
- setPolicyInfo(newPolicyInfo);
- this.policyContext = federationPolicyContext;
- }
-
- /**
- * Overridable validation step for the policy configuration.
- * @param newPolicyInfo the configuration to test.
- * @throws FederationPolicyInitializationException if the configuration is
- * not valid.
- */
- public void validate(WeightedPolicyInfo newPolicyInfo) throws
- FederationPolicyInitializationException {
- if (newPolicyInfo == null) {
- throw new FederationPolicyInitializationException("The policy to "
- + "validate should not be null.");
- }
- Map<SubClusterIdInfo, Float> newWeights =
- newPolicyInfo.getRouterPolicyWeights();
- if (newWeights == null || newWeights.size() < 1) {
- throw new FederationPolicyInitializationException(
- "Weight vector cannot be null/empty.");
- }
- }
-
-
- /**
- * Getter method for the configuration weights.
- *
- * @return the {@link WeightedPolicyInfo} representing the policy
- * configuration.
- */
- public WeightedPolicyInfo getPolicyInfo() {
- return policyInfo;
- }
-
- /**
- * Setter method for the configuration weights.
- *
- * @param policyInfo the {@link WeightedPolicyInfo} representing the policy
- * configuration.
- */
- public void setPolicyInfo(
- WeightedPolicyInfo policyInfo) {
- this.policyInfo = policyInfo;
- }
-
- /**
- * Getter method for the {@link FederationPolicyInitializationContext}.
- * @return the context for this policy.
- */
- public FederationPolicyInitializationContext getPolicyContext() {
- return policyContext;
- }
-
- /**
- * Setter method for the {@link FederationPolicyInitializationContext}.
- * @param policyContext the context to assign to this policy.
- */
- public void setPolicyContext(
- FederationPolicyInitializationContext policyContext) {
- this.policyContext = policyContext;
- }
-
- /**
- * This methods gets active subclusters map from the {@code
- * FederationStateStoreFacade} and validate it not being null/empty.
- *
- * @return the map of ids to info for all active subclusters.
- * @throws YarnException if we can't get the list.
- */
- protected Map<SubClusterId, SubClusterInfo> getActiveSubclusters()
- throws YarnException {
-
- Map<SubClusterId, SubClusterInfo> activeSubclusters = getPolicyContext()
- .getFederationStateStoreFacade().getSubClusters(true);
-
- if (activeSubclusters == null || activeSubclusters.size() < 1) {
- throw new NoActiveSubclustersException(
- "Zero active subclusters, cannot pick where to send job.");
- }
- return activeSubclusters;
- }
-
-
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1793757d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
index 42c86cc..90ea0a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/FederationRouterPolicy.java
@@ -35,11 +35,10 @@ public interface FederationRouterPolicy extends ConfigurableFederationPolicy {
* @param appSubmissionContext the context for the app being submitted.
*
* @return the sub-cluster as identified by {@link SubClusterId} to route the
- * request to.
+ * request to.
*
* @throws YarnException if the policy cannot determine a viable subcluster.
*/
SubClusterId getHomeSubcluster(
- ApplicationSubmissionContext appSubmissionContext)
- throws YarnException;
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org