You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/05/27 00:11:37 UTC
[20/50] [abbrv] hadoop git commit: YARN-5676. Add a
HashBasedRouterPolicy,
and small policies and test refactoring. (Carlo Curino via Subru).
YARN-5676. Add a HashBasedRouterPolicy, and small policies and test refactoring. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/955faca4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/955faca4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/955faca4
Branch: refs/heads/YARN-2915
Commit: 955faca4d42b89a4b0bca62a0293c98caf982f28
Parents: cd7ec98
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Nov 22 15:02:22 2016 -0800
Committer: Subru Krishnan <su...@apache.org>
Committed: Fri May 26 17:02:09 2017 -0700
----------------------------------------------------------------------
.../hadoop/yarn/conf/YarnConfiguration.java | 3 +-
.../policies/AbstractPolicyManager.java | 175 -----------------
.../policies/FederationPolicyManager.java | 117 ------------
.../PriorityBroadcastPolicyManager.java | 66 -------
.../federation/policies/RouterPolicyFacade.java | 1 +
.../policies/UniformBroadcastPolicyManager.java | 56 ------
.../policies/WeightedLocalityPolicyManager.java | 67 -------
.../policies/manager/AbstractPolicyManager.java | 190 +++++++++++++++++++
.../manager/FederationPolicyManager.java | 118 ++++++++++++
.../manager/HashBroadcastPolicyManager.java | 38 ++++
.../manager/PriorityBroadcastPolicyManager.java | 66 +++++++
.../manager/UniformBroadcastPolicyManager.java | 44 +++++
.../manager/WeightedLocalityPolicyManager.java | 67 +++++++
.../policies/manager/package-info.java | 19 ++
.../policies/router/AbstractRouterPolicy.java | 19 ++
.../policies/router/HashBasedRouterPolicy.java | 81 ++++++++
.../policies/router/LoadBasedRouterPolicy.java | 3 +
.../policies/router/PriorityRouterPolicy.java | 3 +
.../router/UniformRandomRouterPolicy.java | 10 +-
.../router/WeightedRandomRouterPolicy.java | 3 +
.../policies/BaseFederationPoliciesTest.java | 17 +-
.../policies/BasePolicyManagerTest.java | 108 -----------
...ionPolicyInitializationContextValidator.java | 1 +
.../TestPriorityBroadcastPolicyManager.java | 72 -------
.../policies/TestRouterPolicyFacade.java | 2 +
.../TestUniformBroadcastPolicyManager.java | 40 ----
.../TestWeightedLocalityPolicyManager.java | 79 --------
.../policies/manager/BasePolicyManagerTest.java | 104 ++++++++++
.../TestHashBasedBroadcastPolicyManager.java | 40 ++++
.../TestPriorityBroadcastPolicyManager.java | 72 +++++++
.../TestUniformBroadcastPolicyManager.java | 40 ++++
.../TestWeightedLocalityPolicyManager.java | 79 ++++++++
.../policies/router/BaseRouterPoliciesTest.java | 51 +++++
.../router/TestHashBasedRouterPolicy.java | 83 ++++++++
.../router/TestLoadBasedRouterPolicy.java | 3 +-
.../router/TestPriorityRouterPolicy.java | 3 +-
.../router/TestUniformRandomRouterPolicy.java | 3 +-
.../router/TestWeightedRandomRouterPolicy.java | 15 +-
38 files changed, 1160 insertions(+), 798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 860d8a8..3436481 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -2582,7 +2582,8 @@ public class YarnConfiguration extends Configuration {
+ "policy-manager";
public static final String DEFAULT_FEDERATION_POLICY_MANAGER = "org.apache"
- + ".hadoop.yarn.server.federation.policies.UniformBroadcastPolicyManager";
+ + ".hadoop.yarn.server.federation.policies"
+ + ".manager.UniformBroadcastPolicyManager";
public static final String FEDERATION_POLICY_MANAGER_PARAMS =
FEDERATION_PREFIX + "policy-manager-params";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
deleted file mode 100644
index e77f2e3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * This class provides basic implementation for common methods that multiple
- * policies will need to implement.
- */
-public abstract class AbstractPolicyManager implements
- FederationPolicyManager {
-
- private String queue;
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected Class routerFederationPolicy;
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected Class amrmProxyFederationPolicy;
-
- public static final Logger LOG =
- LoggerFactory.getLogger(AbstractPolicyManager.class);
- /**
- * This default implementation validates the
- * {@link FederationPolicyInitializationContext},
- * then checks whether it needs to reinstantiate the class (null or
- * mismatching type), and reinitialize the policy.
- *
- * @param federationPolicyContext the current context
- * @param oldInstance the existing (possibly null) instance.
- *
- * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
- * instance
- *
- * @throws FederationPolicyInitializationException if the reinitalization is
- * not valid, and ensure
- * previous state is preserved
- */
- public FederationAMRMProxyPolicy getAMRMPolicy(
- FederationPolicyInitializationContext federationPolicyContext,
- FederationAMRMProxyPolicy oldInstance)
- throws FederationPolicyInitializationException {
-
- if (amrmProxyFederationPolicy == null) {
- throw new FederationPolicyInitializationException("The parameter "
- + "amrmProxyFederationPolicy should be initialized in "
- + this.getClass().getSimpleName() + " constructor.");
- }
-
- try {
- return (FederationAMRMProxyPolicy) internalPolicyGetter(
- federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
- } catch (ClassCastException e) {
- throw new FederationPolicyInitializationException(e);
- }
-
- }
-
- /**
- * This default implementation validates the
- * {@link FederationPolicyInitializationContext},
- * then checks whether it needs to reinstantiate the class (null or
- * mismatching type), and reinitialize the policy.
- *
- * @param federationPolicyContext the current context
- * @param oldInstance the existing (possibly null) instance.
- *
- * @return a valid and fully reinitalized {@link FederationRouterPolicy}
- * instance
- *
- * @throws FederationPolicyInitializationException if the reinitalization is
- * not valid, and ensure
- * previous state is preserved
- */
-
- public FederationRouterPolicy getRouterPolicy(
- FederationPolicyInitializationContext federationPolicyContext,
- FederationRouterPolicy oldInstance)
- throws FederationPolicyInitializationException {
-
- //checks that sub-types properly initialize the types of policies
- if (routerFederationPolicy == null) {
- throw new FederationPolicyInitializationException("The policy "
- + "type should be initialized in " + this.getClass().getSimpleName()
- + " constructor.");
- }
-
- try {
- return (FederationRouterPolicy) internalPolicyGetter(
- federationPolicyContext, oldInstance, routerFederationPolicy);
- } catch (ClassCastException e) {
- throw new FederationPolicyInitializationException(e);
- }
- }
-
- @Override
- public String getQueue() {
- return queue;
- }
-
- @Override
- public void setQueue(String queue) {
- this.queue = queue;
- }
-
- /**
- * Common functionality to instantiate a reinitialize a {@link
- * ConfigurableFederationPolicy}.
- */
- private ConfigurableFederationPolicy internalPolicyGetter(
- final FederationPolicyInitializationContext federationPolicyContext,
- ConfigurableFederationPolicy oldInstance, Class policy)
- throws FederationPolicyInitializationException {
-
- FederationPolicyInitializationContextValidator
- .validate(federationPolicyContext, this.getClass().getCanonicalName());
-
- if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
- try {
- oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
- } catch (InstantiationException e) {
- throw new FederationPolicyInitializationException(e);
- } catch (IllegalAccessException e) {
- throw new FederationPolicyInitializationException(e);
- }
- }
-
- //copying the context to avoid side-effects
- FederationPolicyInitializationContext modifiedContext =
- updateContext(federationPolicyContext,
- oldInstance.getClass().getCanonicalName());
-
- oldInstance.reinitialize(modifiedContext);
- return oldInstance;
- }
-
- /**
- * This method is used to copy-on-write the context, that will be passed
- * downstream to the router/amrmproxy policies.
- */
- private FederationPolicyInitializationContext updateContext(
- FederationPolicyInitializationContext federationPolicyContext,
- String type) {
- // copying configuration and context to avoid modification of original
- SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
- .newInstance(federationPolicyContext
- .getSubClusterPolicyConfiguration());
- newConf.setType(type);
-
- return new FederationPolicyInitializationContext(newConf,
- federationPolicyContext.getFederationSubclusterResolver(),
- federationPolicyContext.getFederationStateStoreFacade(),
- federationPolicyContext.getHomeSubcluster());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
deleted file mode 100644
index 39fdba3..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyManager.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-/**
- *
- * Implementors need to provide the ability to serliaze a policy and its
- * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
- * (re)initialization mechanics for the underlying
- * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
- *
- * The serialization aspects are used by admin APIs or a policy engine to store
- * a serialized configuration in the {@code FederationStateStore}, while the
- * getters methods are used to obtain a propertly inizialized policy in the
- * {@code Router} and {@code AMRMProxy} respectively.
- *
- * This interface by design binds together {@link FederationAMRMProxyPolicy} and
- * {@link FederationRouterPolicy} and provide lifecycle support for
- * serialization and deserialization, to reduce configuration mistakes
- * (combining incompatible policies).
- *
- */
-public interface FederationPolicyManager {
-
- /**
- * If the current instance is compatible, this method returns the same
- * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
- * current context, otherwise a new instance initialized with the current
- * context is provided. If the instance is compatible with the current class
- * the implementors should attempt to reinitalize (retaining state). To affect
- * a complete policy reset oldInstance should be null.
- *
- * @param policyContext the current context
- * @param oldInstance the existing (possibly null) instance.
- *
- * @return an updated {@link FederationAMRMProxyPolicy }.
- *
- * @throws FederationPolicyInitializationException if the initialization
- * cannot be completed properly. The oldInstance should be still
- * valid in case of failed initialization.
- */
- FederationAMRMProxyPolicy getAMRMPolicy(
- FederationPolicyInitializationContext policyContext,
- FederationAMRMProxyPolicy oldInstance)
- throws FederationPolicyInitializationException;
-
- /**
- * If the current instance is compatible, this method returns the same
- * instance of {@link FederationRouterPolicy} reinitialized with the current
- * context, otherwise a new instance initialized with the current context is
- * provided. If the instance is compatible with the current class the
- * implementors should attempt to reinitalize (retaining state). To affect a
- * complete policy reset oldInstance shoulb be set to null.
- *
- * @param policyContext the current context
- * @param oldInstance the existing (possibly null) instance.
- *
- * @return an updated {@link FederationRouterPolicy}.
- *
- * @throws FederationPolicyInitializationException if the initalization cannot
- * be completed properly. The oldInstance should be still valid in
- * case of failed initialization.
- */
- FederationRouterPolicy getRouterPolicy(
- FederationPolicyInitializationContext policyContext,
- FederationRouterPolicy oldInstance)
- throws FederationPolicyInitializationException;
-
- /**
- * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
- * This is to be used when writing a policy object in the federation policy
- * store.
- *
- * @return a valid policy configuration representing this object
- * parametrization.
- *
- * @throws FederationPolicyInitializationException if the current state cannot
- * be serialized properly
- */
- SubClusterPolicyConfiguration serializeConf()
- throws FederationPolicyInitializationException;
-
- /**
- * This method returns the queue this policy is configured for.
- *
- * @return the name of the queue.
- */
- String getQueue();
-
- /**
- * This methods provides a setter for the queue this policy is specified for.
- *
- * @param queue the name of the queue.
- */
- void setQueue(String queue);
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
deleted file mode 100644
index ebdcf42..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/PriorityBroadcastPolicyManager.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import java.nio.ByteBuffer;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Policy that allows operator to configure "weights" for routing. This picks a
- * {@link PriorityRouterPolicy} for the router and a
- * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
- * work together.
- */
-public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
-
- private WeightedPolicyInfo weightedPolicyInfo;
-
- public PriorityBroadcastPolicyManager() {
- // this structurally hard-codes two compatible policies for Router and
- // AMRMProxy.
- routerFederationPolicy = PriorityRouterPolicy.class;
- amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
- weightedPolicyInfo = new WeightedPolicyInfo();
- }
-
- @Override
- public SubClusterPolicyConfiguration serializeConf()
- throws FederationPolicyInitializationException {
- ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
- return SubClusterPolicyConfiguration.newInstance(getQueue(),
- this.getClass().getCanonicalName(), buf);
- }
-
- @VisibleForTesting
- public WeightedPolicyInfo getWeightedPolicyInfo() {
- return weightedPolicyInfo;
- }
-
- @VisibleForTesting
- public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
- this.weightedPolicyInfo = weightedPolicyInfo;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
index a3fd15a..8c22623 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/RouterPolicyFacade.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
deleted file mode 100644
index a01f8fa..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import java.nio.ByteBuffer;
-
-/**
- * This class represents a simple implementation of a {@code
- * FederationPolicyManager}.
- *
- * It combines the basic policies: {@link UniformRandomRouterPolicy} and
- * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
- * "spread" the load among sub-clusters uniformly.
- *
- * This simple policy might impose heavy load on the RMs and return more
- * containers than a job requested as all requests are (replicated and)
- * broadcasted.
- */
-public class UniformBroadcastPolicyManager
- extends AbstractPolicyManager {
-
- public UniformBroadcastPolicyManager() {
- //this structurally hard-codes two compatible policies for Router and
- // AMRMProxy.
- routerFederationPolicy = UniformRandomRouterPolicy.class;
- amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
- }
-
- @Override
- public SubClusterPolicyConfiguration serializeConf()
- throws FederationPolicyInitializationException {
- ByteBuffer buf = ByteBuffer.allocate(0);
- return SubClusterPolicyConfiguration
- .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
deleted file mode 100644
index f3c6673..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-
-import java.nio.ByteBuffer;
-
-/**
- * Policy that allows operator to configure "weights" for routing. This picks a
- * {@link WeightedRandomRouterPolicy} for the router and a {@link
- * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
- * work together.
- */
-public class WeightedLocalityPolicyManager
- extends AbstractPolicyManager {
-
- private WeightedPolicyInfo weightedPolicyInfo;
-
- public WeightedLocalityPolicyManager() {
- //this structurally hard-codes two compatible policies for Router and
- // AMRMProxy.
- routerFederationPolicy = WeightedRandomRouterPolicy.class;
- amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
- weightedPolicyInfo = new WeightedPolicyInfo();
- }
-
- @Override
- public SubClusterPolicyConfiguration serializeConf()
- throws FederationPolicyInitializationException {
- ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
- return SubClusterPolicyConfiguration
- .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
- }
-
- @VisibleForTesting
- public WeightedPolicyInfo getWeightedPolicyInfo() {
- return weightedPolicyInfo;
- }
-
- @VisibleForTesting
- public void setWeightedPolicyInfo(
- WeightedPolicyInfo weightedPolicyInfo) {
- this.weightedPolicyInfo = weightedPolicyInfo;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
new file mode 100644
index 0000000..f7a89c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/AbstractPolicyManager.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.ConfigurableFederationPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class provides basic implementation for common methods that multiple
+ * policies will need to implement.
+ */
+public abstract class AbstractPolicyManager implements
+ FederationPolicyManager {
+
+ private String queue;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class routerFederationPolicy;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class amrmProxyFederationPolicy;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AbstractPolicyManager.class);
+ /**
+ * This default implementation validates the
+ * {@link FederationPolicyInitializationContext},
+ * then checks whether it needs to reinstantiate the class (null or
+ * mismatching type), and reinitialize the policy.
+ *
+ * @param federationPolicyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
+ * instance
+ *
+ * @throws FederationPolicyInitializationException if the reinitalization is
+ * not valid, and ensure
+ * previous state is preserved
+ */
+ public FederationAMRMProxyPolicy getAMRMPolicy(
+ FederationPolicyInitializationContext federationPolicyContext,
+ FederationAMRMProxyPolicy oldInstance)
+ throws FederationPolicyInitializationException {
+
+ if (amrmProxyFederationPolicy == null) {
+ throw new FederationPolicyInitializationException("The parameter "
+ + "amrmProxyFederationPolicy should be initialized in "
+ + this.getClass().getSimpleName() + " constructor.");
+ }
+
+ try {
+ return (FederationAMRMProxyPolicy) internalPolicyGetter(
+ federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
+ } catch (ClassCastException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+
+ }
+
+ /**
+ * This default implementation validates the
+ * {@link FederationPolicyInitializationContext},
+ * then checks whether it needs to reinstantiate the class (null or
+ * mismatching type), and reinitialize the policy.
+ *
+ * @param federationPolicyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return a valid and fully reinitalized {@link FederationRouterPolicy}
+ * instance
+ *
+ * @throws FederationPolicyInitializationException if the reinitalization is
+ * not valid, and ensure
+ * previous state is preserved
+ */
+
+ public FederationRouterPolicy getRouterPolicy(
+ FederationPolicyInitializationContext federationPolicyContext,
+ FederationRouterPolicy oldInstance)
+ throws FederationPolicyInitializationException {
+
+ //checks that sub-types properly initialize the types of policies
+ if (routerFederationPolicy == null) {
+ throw new FederationPolicyInitializationException("The policy "
+ + "type should be initialized in " + this.getClass().getSimpleName()
+ + " constructor.");
+ }
+
+ try {
+ return (FederationRouterPolicy) internalPolicyGetter(
+ federationPolicyContext, oldInstance, routerFederationPolicy);
+ } catch (ClassCastException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ // default implementation works only for sub-classes which do not require
+ // any parameters
+ ByteBuffer buf = ByteBuffer.allocate(0);
+ return SubClusterPolicyConfiguration
+ .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+ }
+
+ @Override
+ public String getQueue() {
+ return queue;
+ }
+
+ @Override
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * Common functionality to instantiate a reinitialize a {@link
+ * ConfigurableFederationPolicy}.
+ */
+ private ConfigurableFederationPolicy internalPolicyGetter(
+ final FederationPolicyInitializationContext federationPolicyContext,
+ ConfigurableFederationPolicy oldInstance, Class policy)
+ throws FederationPolicyInitializationException {
+
+ FederationPolicyInitializationContextValidator
+ .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+ if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
+ try {
+ oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
+ } catch (InstantiationException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (IllegalAccessException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ }
+
+ //copying the context to avoid side-effects
+ FederationPolicyInitializationContext modifiedContext =
+ updateContext(federationPolicyContext,
+ oldInstance.getClass().getCanonicalName());
+
+ oldInstance.reinitialize(modifiedContext);
+ return oldInstance;
+ }
+
+ /**
+ * This method is used to copy-on-write the context, that will be passed
+ * downstream to the router/amrmproxy policies.
+ */
+ private FederationPolicyInitializationContext updateContext(
+ FederationPolicyInitializationContext federationPolicyContext,
+ String type) {
+ // copying configuration and context to avoid modification of original
+ SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
+ .newInstance(federationPolicyContext
+ .getSubClusterPolicyConfiguration());
+ newConf.setType(type);
+
+ return new FederationPolicyInitializationContext(newConf,
+ federationPolicyContext.getFederationSubclusterResolver(),
+ federationPolicyContext.getFederationStateStoreFacade(),
+ federationPolicyContext.getHomeSubcluster());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
new file mode 100644
index 0000000..1434c80
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/FederationPolicyManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+/**
+ *
+ * Implementors need to provide the ability to serliaze a policy and its
+ * configuration as a {@link SubClusterPolicyConfiguration}, as well as provide
+ * (re)initialization mechanics for the underlying
+ * {@link FederationAMRMProxyPolicy} and {@link FederationRouterPolicy}.
+ *
+ * The serialization aspects are used by admin APIs or a policy engine to store
+ * a serialized configuration in the {@code FederationStateStore}, while the
+ * getters methods are used to obtain a propertly inizialized policy in the
+ * {@code Router} and {@code AMRMProxy} respectively.
+ *
+ * This interface by design binds together {@link FederationAMRMProxyPolicy} and
+ * {@link FederationRouterPolicy} and provide lifecycle support for
+ * serialization and deserialization, to reduce configuration mistakes
+ * (combining incompatible policies).
+ *
+ */
+public interface FederationPolicyManager {
+
+ /**
+ * If the current instance is compatible, this method returns the same
+ * instance of {@link FederationAMRMProxyPolicy} reinitialized with the
+ * current context, otherwise a new instance initialized with the current
+ * context is provided. If the instance is compatible with the current class
+ * the implementors should attempt to reinitalize (retaining state). To affect
+ * a complete policy reset oldInstance should be null.
+ *
+ * @param policyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return an updated {@link FederationAMRMProxyPolicy }.
+ *
+ * @throws FederationPolicyInitializationException if the initialization
+ * cannot be completed properly. The oldInstance should be still
+ * valid in case of failed initialization.
+ */
+ FederationAMRMProxyPolicy getAMRMPolicy(
+ FederationPolicyInitializationContext policyContext,
+ FederationAMRMProxyPolicy oldInstance)
+ throws FederationPolicyInitializationException;
+
+ /**
+ * If the current instance is compatible, this method returns the same
+ * instance of {@link FederationRouterPolicy} reinitialized with the current
+ * context, otherwise a new instance initialized with the current context is
+ * provided. If the instance is compatible with the current class the
+ * implementors should attempt to reinitalize (retaining state). To affect a
+ * complete policy reset oldInstance shoulb be set to null.
+ *
+ * @param policyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return an updated {@link FederationRouterPolicy}.
+ *
+ * @throws FederationPolicyInitializationException if the initalization cannot
+ * be completed properly. The oldInstance should be still valid in
+ * case of failed initialization.
+ */
+ FederationRouterPolicy getRouterPolicy(
+ FederationPolicyInitializationContext policyContext,
+ FederationRouterPolicy oldInstance)
+ throws FederationPolicyInitializationException;
+
+ /**
+ * This method is invoked to derive a {@link SubClusterPolicyConfiguration}.
+ * This is to be used when writing a policy object in the federation policy
+ * store.
+ *
+ * @return a valid policy configuration representing this object
+ * parametrization.
+ *
+ * @throws FederationPolicyInitializationException if the current state cannot
+ * be serialized properly
+ */
+ SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException;
+
+ /**
+ * This method returns the queue this policy is configured for.
+ *
+ * @return the name of the queue.
+ */
+ String getQueue();
+
+ /**
+ * This methods provides a setter for the queue this policy is specified for.
+ *
+ * @param queue the name of the queue.
+ */
+ void setQueue(String queue);
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
new file mode 100644
index 0000000..08ab08f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/HashBroadcastPolicyManager.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.HashBasedRouterPolicy;
+
+/**
+ * Policy that routes applications via hashing of their queuename, and broadcast
+ * resource requests. This picks a {@link HashBasedRouterPolicy} for the router
+ * and a {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed
+ * to work together.
+ */
+public class HashBroadcastPolicyManager extends AbstractPolicyManager {
+
+ public HashBroadcastPolicyManager() {
+ // this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = HashBasedRouterPolicy.class;
+ amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
new file mode 100644
index 0000000..8139e12
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/PriorityBroadcastPolicyManager.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link PriorityRouterPolicy} for the router and a
+ * {@link BroadcastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class PriorityBroadcastPolicyManager extends AbstractPolicyManager {
+
+ private WeightedPolicyInfo weightedPolicyInfo;
+
+ public PriorityBroadcastPolicyManager() {
+ // this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = PriorityRouterPolicy.class;
+ amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+ weightedPolicyInfo = new WeightedPolicyInfo();
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+ return SubClusterPolicyConfiguration.newInstance(getQueue(),
+ this.getClass().getCanonicalName(), buf);
+ }
+
+ @VisibleForTesting
+ public WeightedPolicyInfo getWeightedPolicyInfo() {
+ return weightedPolicyInfo;
+ }
+
+ @VisibleForTesting
+ public void setWeightedPolicyInfo(WeightedPolicyInfo weightedPolicyInfo) {
+ this.weightedPolicyInfo = weightedPolicyInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..5db0466
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/UniformBroadcastPolicyManager.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+
+/**
+ * This class represents a simple implementation of a {@code
+ * FederationPolicyManager}.
+ *
+ * It combines the basic policies: {@link UniformRandomRouterPolicy} and
+ * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
+ * "spread" the load among sub-clusters uniformly.
+ *
+ * This simple policy might impose heavy load on the RMs and return more
+ * containers than a job requested as all requests are (replicated and)
+ * broadcasted.
+ */
+public class UniformBroadcastPolicyManager extends AbstractPolicyManager {
+
+ public UniformBroadcastPolicyManager() {
+ // this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = UniformRandomRouterPolicy.class;
+ amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..109b534
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/WeightedLocalityPolicyManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.manager;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link WeightedRandomRouterPolicy} for the router and a {@link
+ * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class WeightedLocalityPolicyManager
+ extends AbstractPolicyManager {
+
+ private WeightedPolicyInfo weightedPolicyInfo;
+
+ public WeightedLocalityPolicyManager() {
+ //this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = WeightedRandomRouterPolicy.class;
+ amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
+ weightedPolicyInfo = new WeightedPolicyInfo();
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+ return SubClusterPolicyConfiguration
+ .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+ }
+
+ @VisibleForTesting
+ public WeightedPolicyInfo getWeightedPolicyInfo() {
+ return weightedPolicyInfo;
+ }
+
+ @VisibleForTesting
+ public void setWeightedPolicyInfo(
+ WeightedPolicyInfo weightedPolicyInfo) {
+ this.weightedPolicyInfo = weightedPolicyInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
new file mode 100644
index 0000000..9515c01
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/manager/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Various implementation of FederationPolicyManager. **/
+package org.apache.hadoop.yarn.server.federation.policies.manager;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
index f49af1d..730fb41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/AbstractRouterPolicy.java
@@ -20,8 +20,11 @@ package org.apache.hadoop.yarn.server.federation.policies.router;
import java.util.Map;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.federation.policies.AbstractConfigurableFederationPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
@@ -44,4 +47,20 @@ public abstract class AbstractRouterPolicy extends
}
}
+ public void validate(ApplicationSubmissionContext appSubmissionContext)
+ throws FederationPolicyException {
+
+ if (appSubmissionContext == null) {
+ throw new FederationPolicyException(
+ "Cannot route an application with null context.");
+ }
+
+ // if the queue is not specified we set it to default value, to be
+ // compatible with YARN behavior.
+ String queue = appSubmissionContext.getQueue();
+ if (queue == null) {
+ appSubmissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
new file mode 100644
index 0000000..e40e87e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/HashBasedRouterPolicy.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies.router;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
+import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContextValidator;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
+
+/**
+ * This {@link FederationRouterPolicy} pick a subcluster based on the hash of
+ * the job's queue name. Useful to provide a default behavior when too many
+ * queues exist in a system. This also ensures that all jobs belonging to a
+ * queue are mapped to the same sub-cluster (likely help with locality).
+ */
+public class HashBasedRouterPolicy extends AbstractRouterPolicy {
+
+ @Override
+ public void reinitialize(
+ FederationPolicyInitializationContext federationPolicyContext)
+ throws FederationPolicyInitializationException {
+ FederationPolicyInitializationContextValidator
+ .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+ // note: this overrides BaseRouterPolicy and ignores the weights
+ setPolicyContext(federationPolicyContext);
+ }
+
+ /**
+ * Simply picks from alphabetically-sorted active subclusters based on the
+ * hash of quey name. Jobs of the same queue will all be routed to the same
+ * sub-cluster, as far as the number of active sub-cluster and their names
+ * remain the same.
+ *
+ * @param appSubmissionContext the context for the app being submitted.
+ *
+ * @return a hash-based chosen subcluster.
+ *
+ * @throws YarnException if there are no active subclusters.
+ */
+ public SubClusterId getHomeSubcluster(
+ ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+
+ // throws if no active subclusters available
+ Map<SubClusterId, SubClusterInfo> activeSubclusters =
+ getActiveSubclusters();
+
+ validate(appSubmissionContext);
+
+ int chosenPosition = Math.abs(
+ appSubmissionContext.getQueue().hashCode() % activeSubclusters.size());
+
+ List<SubClusterId> list = new ArrayList<>(activeSubclusters.keySet());
+ Collections.sort(list);
+ return list.get(chosenPosition);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
index 5de749f..2ca15bf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/LoadBasedRouterPolicy.java
@@ -64,6 +64,9 @@ public class LoadBasedRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ // null checks and default-queue behavior
+ validate(appSubmissionContext);
+
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
index bc3a1f7..13d9140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/PriorityRouterPolicy.java
@@ -36,6 +36,9 @@ public class PriorityRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ // null checks and default-queue behavior
+ validate(appSubmissionContext);
+
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
index b8f9cc3..d820449 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/UniformRandomRouterPolicy.java
@@ -48,11 +48,10 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
}
@Override
- public void reinitialize(
- FederationPolicyInitializationContext policyContext)
+ public void reinitialize(FederationPolicyInitializationContext policyContext)
throws FederationPolicyInitializationException {
- FederationPolicyInitializationContextValidator
- .validate(policyContext, this.getClass().getCanonicalName());
+ FederationPolicyInitializationContextValidator.validate(policyContext,
+ this.getClass().getCanonicalName());
// note: this overrides AbstractRouterPolicy and ignores the weights
@@ -73,6 +72,9 @@ public class UniformRandomRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ // null checks and default-queue behavior
+ validate(appSubmissionContext);
+
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
index ac75ae9..5727134 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/router/WeightedRandomRouterPolicy.java
@@ -43,6 +43,9 @@ public class WeightedRandomRouterPolicy extends AbstractRouterPolicy {
public SubClusterId getHomeSubcluster(
ApplicationSubmissionContext appSubmissionContext) throws YarnException {
+ // null checks and default-queue behavior
+ validate(appSubmissionContext);
+
Map<SubClusterId, SubClusterInfo> activeSubclusters =
getActiveSubclusters();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
index ba897da..6bd8bf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BaseFederationPoliciesTest.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.federation.policies;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -35,8 +36,10 @@ import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPo
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.junit.Test;
@@ -46,7 +49,7 @@ import org.junit.Test;
public abstract class BaseFederationPoliciesTest {
private ConfigurableFederationPolicy policy;
- private WeightedPolicyInfo policyInfo;
+ private WeightedPolicyInfo policyInfo = mock(WeightedPolicyInfo.class);
private Map<SubClusterId, SubClusterInfo> activeSubclusters = new HashMap<>();
private FederationPolicyInitializationContext federationPolicyContext;
private ApplicationSubmissionContext applicationSubmissionContext =
@@ -103,7 +106,7 @@ public abstract class BaseFederationPoliciesTest {
((FederationRouterPolicy) localPolicy)
.getHomeSubcluster(getApplicationSubmissionContext());
} else {
- String[] hosts = new String[] {"host1", "host2" };
+ String[] hosts = new String[] {"host1", "host2"};
List<ResourceRequest> resourceRequests = FederationPoliciesTestUtil
.createResourceRequests(hosts, 2 * 1024, 2, 1, 3, null, false);
((FederationAMRMProxyPolicy) localPolicy)
@@ -170,4 +173,14 @@ public abstract class BaseFederationPoliciesTest {
this.homeSubCluster = homeSubCluster;
}
+ public void setMockActiveSubclusters(int numSubclusters) {
+ for (int i = 1; i <= numSubclusters; i++) {
+ SubClusterIdInfo sc = new SubClusterIdInfo("sc" + i);
+ SubClusterInfo sci = mock(SubClusterInfo.class);
+ when(sci.getState()).thenReturn(SubClusterState.SC_RUNNING);
+ when(sci.getSubClusterId()).thenReturn(sc.toId());
+ getActiveSubclusters().put(sc.toId(), sci);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
deleted file mode 100644
index c609886..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
-import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
-import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * This class provides common test methods for testing {@code
- * FederationPolicyManager}s.
- */
-public abstract class BasePolicyManagerTest {
-
-
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected FederationPolicyManager wfp = null;
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected Class expectedPolicyManager;
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected Class expectedAMRMProxyPolicy;
- @SuppressWarnings("checkstyle:visibilitymodifier")
- protected Class expectedRouterPolicy;
-
-
- @Test
- public void testSerializeAndInstantiate() throws Exception {
- serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
- expectedAMRMProxyPolicy,
- expectedRouterPolicy);
- }
-
- @Test(expected = FederationPolicyInitializationException.class)
- public void testSerializeAndInstantiateBad1() throws Exception {
- serializeAndDeserializePolicyManager(wfp, String.class,
- expectedAMRMProxyPolicy, expectedRouterPolicy);
- }
-
- @Test(expected = AssertionError.class)
- public void testSerializeAndInstantiateBad2() throws Exception {
- serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
- String.class, expectedRouterPolicy);
- }
-
- @Test(expected = AssertionError.class)
- public void testSerializeAndInstantiateBad3() throws Exception {
- serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
- expectedAMRMProxyPolicy, String.class);
- }
-
- protected static void serializeAndDeserializePolicyManager(
- FederationPolicyManager wfp, Class policyManagerType,
- Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
-
- // serializeConf it in a context
- SubClusterPolicyConfiguration fpc =
- wfp.serializeConf();
- fpc.setType(policyManagerType.getCanonicalName());
- FederationPolicyInitializationContext context = new
- FederationPolicyInitializationContext();
- context.setSubClusterPolicyConfiguration(fpc);
- context
- .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
- context.setFederationSubclusterResolver(
- FederationPoliciesTestUtil.initResolver());
- context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
-
- // based on the "context" created instantiate new class and use it
- Class c = Class.forName(wfp.getClass().getCanonicalName());
- FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
-
- FederationAMRMProxyPolicy federationAMRMProxyPolicy =
- wfp2.getAMRMPolicy(context, null);
-
- //needed only for tests (getARMRMPolicy change the "type" in conf)
- fpc.setType(wfp.getClass().getCanonicalName());
-
- FederationRouterPolicy federationRouterPolicy =
- wfp2.getRouterPolicy(context, null);
-
- Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
- expAMRMProxyPolicy);
-
- Assert.assertEquals(federationRouterPolicy.getClass(),
- expRouterPolicy);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index d906b92..611a486 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.manager.FederationPolicyManager;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/955faca4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
deleted file mode 100644
index 5e5bc83..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestPriorityBroadcastPolicyManager.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.yarn.server.federation.policies;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
-import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
-import org.apache.hadoop.yarn.server.federation.policies.router.PriorityRouterPolicy;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
-import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Simple test of {@link PriorityBroadcastPolicyManager}.
- */
-public class TestPriorityBroadcastPolicyManager extends BasePolicyManagerTest {
-
- private WeightedPolicyInfo policyInfo;
-
- @Before
- public void setup() {
- // configure a policy
-
- wfp = new PriorityBroadcastPolicyManager();
- wfp.setQueue("queue1");
- SubClusterId sc1 = SubClusterId.newInstance("sc1");
- SubClusterId sc2 = SubClusterId.newInstance("sc2");
- policyInfo = new WeightedPolicyInfo();
-
- Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
- routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
- routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
- policyInfo.setRouterPolicyWeights(routerWeights);
-
- ((PriorityBroadcastPolicyManager) wfp).setWeightedPolicyInfo(policyInfo);
-
- // set expected params that the base test class will use for tests
- expectedPolicyManager = PriorityBroadcastPolicyManager.class;
- expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
- expectedRouterPolicy = PriorityRouterPolicy.class;
- }
-
- @Test
- public void testPolicyInfoSetCorrectly() throws Exception {
- serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
- expectedAMRMProxyPolicy, expectedRouterPolicy);
-
- // check the policyInfo propagates through ser/der correctly
- Assert.assertEquals(
- ((PriorityBroadcastPolicyManager) wfp).getWeightedPolicyInfo(),
- policyInfo);
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org