You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2017/02/15 00:57:09 UTC
[32/50] [abbrv] hadoop git commit: YARN-5391. PolicyManager to tie
together Router/AMRM Federation policies. (Carlo Curino via Subru).
YARN-5391. PolicyManager to tie together Router/AMRM Federation policies. (Carlo Curino via Subru).
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/112b99b9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/112b99b9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/112b99b9
Branch: refs/heads/YARN-2915
Commit: 112b99b944186956cdfd36cf57c58a41cd8f94c9
Parents: ed1764c
Author: Subru Krishnan <su...@apache.org>
Authored: Tue Nov 1 19:54:18 2016 -0700
Committer: Subru Krishnan <su...@apache.org>
Committed: Tue Feb 14 16:01:14 2017 -0800
----------------------------------------------------------------------
.../policies/AbstractPolicyManager.java | 175 +++++++++++++++++++
.../FederationPolicyInitializationContext.java | 3 +-
.../policies/UniformBroadcastPolicyManager.java | 56 ++++++
.../policies/WeightedLocalityPolicyManager.java | 67 +++++++
.../records/SubClusterPolicyConfiguration.java | 13 ++
.../policies/BasePolicyManagerTest.java | 108 ++++++++++++
...ionPolicyInitializationContextValidator.java | 5 +-
.../TestUniformBroadcastPolicyManager.java | 40 +++++
.../TestWeightedLocalityPolicyManager.java | 79 +++++++++
.../utils/FederationPoliciesTestUtil.java | 2 +-
10 files changed, 545 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
new file mode 100644
index 0000000..e77f2e3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/AbstractPolicyManager.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides basic implementation for common methods that multiple
+ * policies will need to implement.
+ */
+public abstract class AbstractPolicyManager implements
+ FederationPolicyManager {
+
+ private String queue;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class routerFederationPolicy;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class amrmProxyFederationPolicy;
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(AbstractPolicyManager.class);
+ /**
+ * This default implementation validates the
+ * {@link FederationPolicyInitializationContext},
+ * then checks whether it needs to reinstantiate the class (null or
+ * mismatching type), and reinitialize the policy.
+ *
+ * @param federationPolicyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return a valid and fully reinitalized {@link FederationAMRMProxyPolicy}
+ * instance
+ *
+ * @throws FederationPolicyInitializationException if the reinitalization is
+ * not valid, and ensure
+ * previous state is preserved
+ */
+ public FederationAMRMProxyPolicy getAMRMPolicy(
+ FederationPolicyInitializationContext federationPolicyContext,
+ FederationAMRMProxyPolicy oldInstance)
+ throws FederationPolicyInitializationException {
+
+ if (amrmProxyFederationPolicy == null) {
+ throw new FederationPolicyInitializationException("The parameter "
+ + "amrmProxyFederationPolicy should be initialized in "
+ + this.getClass().getSimpleName() + " constructor.");
+ }
+
+ try {
+ return (FederationAMRMProxyPolicy) internalPolicyGetter(
+ federationPolicyContext, oldInstance, amrmProxyFederationPolicy);
+ } catch (ClassCastException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+
+ }
+
+ /**
+ * This default implementation validates the
+ * {@link FederationPolicyInitializationContext},
+ * then checks whether it needs to reinstantiate the class (null or
+ * mismatching type), and reinitialize the policy.
+ *
+ * @param federationPolicyContext the current context
+ * @param oldInstance the existing (possibly null) instance.
+ *
+ * @return a valid and fully reinitalized {@link FederationRouterPolicy}
+ * instance
+ *
+ * @throws FederationPolicyInitializationException if the reinitalization is
+ * not valid, and ensure
+ * previous state is preserved
+ */
+
+ public FederationRouterPolicy getRouterPolicy(
+ FederationPolicyInitializationContext federationPolicyContext,
+ FederationRouterPolicy oldInstance)
+ throws FederationPolicyInitializationException {
+
+ //checks that sub-types properly initialize the types of policies
+ if (routerFederationPolicy == null) {
+ throw new FederationPolicyInitializationException("The policy "
+ + "type should be initialized in " + this.getClass().getSimpleName()
+ + " constructor.");
+ }
+
+ try {
+ return (FederationRouterPolicy) internalPolicyGetter(
+ federationPolicyContext, oldInstance, routerFederationPolicy);
+ } catch (ClassCastException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ }
+
+ @Override
+ public String getQueue() {
+ return queue;
+ }
+
+ @Override
+ public void setQueue(String queue) {
+ this.queue = queue;
+ }
+
+ /**
+ * Common functionality to instantiate a reinitialize a {@link
+ * ConfigurableFederationPolicy}.
+ */
+ private ConfigurableFederationPolicy internalPolicyGetter(
+ final FederationPolicyInitializationContext federationPolicyContext,
+ ConfigurableFederationPolicy oldInstance, Class policy)
+ throws FederationPolicyInitializationException {
+
+ FederationPolicyInitializationContextValidator
+ .validate(federationPolicyContext, this.getClass().getCanonicalName());
+
+ if (oldInstance == null || !oldInstance.getClass().equals(policy)) {
+ try {
+ oldInstance = (ConfigurableFederationPolicy) policy.newInstance();
+ } catch (InstantiationException e) {
+ throw new FederationPolicyInitializationException(e);
+ } catch (IllegalAccessException e) {
+ throw new FederationPolicyInitializationException(e);
+ }
+ }
+
+ //copying the context to avoid side-effects
+ FederationPolicyInitializationContext modifiedContext =
+ updateContext(federationPolicyContext,
+ oldInstance.getClass().getCanonicalName());
+
+ oldInstance.reinitialize(modifiedContext);
+ return oldInstance;
+ }
+
+ /**
+ * This method is used to copy-on-write the context, that will be passed
+ * downstream to the router/amrmproxy policies.
+ */
+ private FederationPolicyInitializationContext updateContext(
+ FederationPolicyInitializationContext federationPolicyContext,
+ String type) {
+ // copying configuration and context to avoid modification of original
+ SubClusterPolicyConfiguration newConf = SubClusterPolicyConfiguration
+ .newInstance(federationPolicyContext
+ .getSubClusterPolicyConfiguration());
+ newConf.setType(type);
+
+ return new FederationPolicyInitializationContext(newConf,
+ federationPolicyContext.getFederationSubclusterResolver(),
+ federationPolicyContext.getFederationStateStoreFacade(),
+ federationPolicyContext.getHomeSubcluster());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
index 46dd6eb..4d29a41 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/FederationPolicyInitializationContext.java
@@ -41,10 +41,11 @@ public class FederationPolicyInitializationContext {
public FederationPolicyInitializationContext(
SubClusterPolicyConfiguration policy, SubClusterResolver resolver,
- FederationStateStoreFacade storeFacade) {
+ FederationStateStoreFacade storeFacade, SubClusterId home) {
this.federationPolicyConfiguration = policy;
this.federationSubclusterResolver = resolver;
this.federationStateStoreFacade = storeFacade;
+ this.homeSubcluster = home;
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..a01f8fa
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/UniformBroadcastPolicyManager.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This class represents a simple implementation of a {@code
+ * FederationPolicyManager}.
+ *
+ * It combines the basic policies: {@link UniformRandomRouterPolicy} and
+ * {@link BroadcastAMRMProxyPolicy}, which are designed to work together and
+ * "spread" the load among sub-clusters uniformly.
+ *
+ * This simple policy might impose heavy load on the RMs and return more
+ * containers than a job requested as all requests are (replicated and)
+ * broadcasted.
+ */
+public class UniformBroadcastPolicyManager
+ extends AbstractPolicyManager {
+
+ public UniformBroadcastPolicyManager() {
+ //this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = UniformRandomRouterPolicy.class;
+ amrmProxyFederationPolicy = BroadcastAMRMProxyPolicy.class;
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ ByteBuffer buf = ByteBuffer.allocate(0);
+ return SubClusterPolicyConfiguration
+ .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..f3c6673
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/WeightedLocalityPolicyManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Policy that allows operator to configure "weights" for routing. This picks a
+ * {@link WeightedRandomRouterPolicy} for the router and a {@link
+ * LocalityMulticastAMRMProxyPolicy} for the amrmproxy as they are designed to
+ * work together.
+ */
+public class WeightedLocalityPolicyManager
+ extends AbstractPolicyManager {
+
+ private WeightedPolicyInfo weightedPolicyInfo;
+
+ public WeightedLocalityPolicyManager() {
+ //this structurally hard-codes two compatible policies for Router and
+ // AMRMProxy.
+ routerFederationPolicy = WeightedRandomRouterPolicy.class;
+ amrmProxyFederationPolicy = LocalityMulticastAMRMProxyPolicy.class;
+ weightedPolicyInfo = new WeightedPolicyInfo();
+ }
+
+ @Override
+ public SubClusterPolicyConfiguration serializeConf()
+ throws FederationPolicyInitializationException {
+ ByteBuffer buf = weightedPolicyInfo.toByteBuffer();
+ return SubClusterPolicyConfiguration
+ .newInstance(getQueue(), this.getClass().getCanonicalName(), buf);
+ }
+
+ @VisibleForTesting
+ public WeightedPolicyInfo getWeightedPolicyInfo() {
+ return weightedPolicyInfo;
+ }
+
+ @VisibleForTesting
+ public void setWeightedPolicyInfo(
+ WeightedPolicyInfo weightedPolicyInfo) {
+ this.weightedPolicyInfo = weightedPolicyInfo;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
index 2839139..52807d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/records/SubClusterPolicyConfiguration.java
@@ -40,6 +40,7 @@ import java.nio.ByteBuffer;
@Unstable
public abstract class SubClusterPolicyConfiguration {
+
@Private
@Unstable
public static SubClusterPolicyConfiguration newInstance(String queue,
@@ -52,6 +53,18 @@ public abstract class SubClusterPolicyConfiguration {
return policy;
}
+ @Private
+ @Unstable
+ public static SubClusterPolicyConfiguration newInstance(
+ SubClusterPolicyConfiguration conf) {
+ SubClusterPolicyConfiguration policy =
+ Records.newRecord(SubClusterPolicyConfiguration.class);
+ policy.setQueue(conf.getQueue());
+ policy.setType(conf.getType());
+ policy.setParams(conf.getParams());
+ return policy;
+ }
+
/**
* Get the name of the queue for which we are configuring a policy.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
new file mode 100644
index 0000000..c609886
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/BasePolicyManagerTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
+import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
+import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class provides common test methods for testing {@code
+ * FederationPolicyManager}s.
+ */
+public abstract class BasePolicyManagerTest {
+
+
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected FederationPolicyManager wfp = null;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class expectedPolicyManager;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class expectedAMRMProxyPolicy;
+ @SuppressWarnings("checkstyle:visibilitymodifier")
+ protected Class expectedRouterPolicy;
+
+
+ @Test
+ public void testSerializeAndInstantiate() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+ expectedAMRMProxyPolicy,
+ expectedRouterPolicy);
+ }
+
+ @Test(expected = FederationPolicyInitializationException.class)
+ public void testSerializeAndInstantiateBad1() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, String.class,
+ expectedAMRMProxyPolicy, expectedRouterPolicy);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testSerializeAndInstantiateBad2() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+ String.class, expectedRouterPolicy);
+ }
+
+ @Test(expected = AssertionError.class)
+ public void testSerializeAndInstantiateBad3() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+ expectedAMRMProxyPolicy, String.class);
+ }
+
+ protected static void serializeAndDeserializePolicyManager(
+ FederationPolicyManager wfp, Class policyManagerType,
+ Class expAMRMProxyPolicy, Class expRouterPolicy) throws Exception {
+
+ // serializeConf it in a context
+ SubClusterPolicyConfiguration fpc =
+ wfp.serializeConf();
+ fpc.setType(policyManagerType.getCanonicalName());
+ FederationPolicyInitializationContext context = new
+ FederationPolicyInitializationContext();
+ context.setSubClusterPolicyConfiguration(fpc);
+ context
+ .setFederationStateStoreFacade(FederationPoliciesTestUtil.initFacade());
+ context.setFederationSubclusterResolver(
+ FederationPoliciesTestUtil.initResolver());
+ context.setHomeSubcluster(SubClusterId.newInstance("homesubcluster"));
+
+ // based on the "context" created instantiate new class and use it
+ Class c = Class.forName(wfp.getClass().getCanonicalName());
+ FederationPolicyManager wfp2 = (FederationPolicyManager) c.newInstance();
+
+ FederationAMRMProxyPolicy federationAMRMProxyPolicy =
+ wfp2.getAMRMPolicy(context, null);
+
+ //needed only for tests (getARMRMPolicy change the "type" in conf)
+ fpc.setType(wfp.getClass().getCanonicalName());
+
+ FederationRouterPolicy federationRouterPolicy =
+ wfp2.getRouterPolicy(context, null);
+
+ Assert.assertEquals(federationAMRMProxyPolicy.getClass(),
+ expAMRMProxyPolicy);
+
+ Assert.assertEquals(federationRouterPolicy.getClass(),
+ expRouterPolicy);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
index c79fd2a..d906b92 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestFederationPolicyInitializationContextValidator.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMR
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.router.FederationRouterPolicy;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.utils.FederationPoliciesTestUtil;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
@@ -38,6 +39,7 @@ public class TestFederationPolicyInitializationContextValidator {
private SubClusterPolicyConfiguration goodConfig;
private SubClusterResolver goodSR;
private FederationStateStoreFacade goodFacade;
+ private SubClusterId goodHome;
private FederationPolicyInitializationContext context;
@Before
@@ -45,8 +47,9 @@ public class TestFederationPolicyInitializationContextValidator {
goodFacade = FederationPoliciesTestUtil.initFacade();
goodConfig = new MockPolicyManager().serializeConf();
goodSR = FederationPoliciesTestUtil.initResolver();
+ goodHome = SubClusterId.newInstance("homesubcluster");
context = new FederationPolicyInitializationContext(goodConfig, goodSR,
- goodFacade);
+ goodFacade, goodHome);
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
new file mode 100644
index 0000000..542a5ae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestUniformBroadcastPolicyManager.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.BroadcastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.router.UniformRandomRouterPolicy;
+import org.junit.Before;
+
+/**
+ * Simple test of {@link UniformBroadcastPolicyManager}.
+ */
+public class TestUniformBroadcastPolicyManager extends BasePolicyManagerTest {
+
+ @Before
+ public void setup() {
+ //config policy
+ wfp = new UniformBroadcastPolicyManager();
+ wfp.setQueue("queue1");
+
+ //set expected params that the base test class will use for tests
+ expectedPolicyManager = UniformBroadcastPolicyManager.class;
+ expectedAMRMProxyPolicy = BroadcastAMRMProxyPolicy.class;
+ expectedRouterPolicy = UniformRandomRouterPolicy.class;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
new file mode 100644
index 0000000..ab9cec4
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/policies/TestWeightedLocalityPolicyManager.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.federation.policies;
+
+import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.LocalityMulticastAMRMProxyPolicy;
+import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
+import org.apache.hadoop.yarn.server.federation.policies.router.WeightedRandomRouterPolicy;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
+import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple test of {@link WeightedLocalityPolicyManager}.
+ */
+public class TestWeightedLocalityPolicyManager extends
+ BasePolicyManagerTest {
+
+ private WeightedPolicyInfo policyInfo;
+
+ @Before
+ public void setup() {
+ // configure a policy
+
+ wfp = new WeightedLocalityPolicyManager();
+ wfp.setQueue("queue1");
+ SubClusterId sc1 = SubClusterId.newInstance("sc1");
+ SubClusterId sc2 = SubClusterId.newInstance("sc2");
+ policyInfo = new WeightedPolicyInfo();
+
+ Map<SubClusterIdInfo, Float> routerWeights = new HashMap<>();
+ routerWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+ routerWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+ policyInfo.setRouterPolicyWeights(routerWeights);
+
+ Map<SubClusterIdInfo, Float> amrmWeights = new HashMap<>();
+ amrmWeights.put(new SubClusterIdInfo(sc1), 0.2f);
+ amrmWeights.put(new SubClusterIdInfo(sc2), 0.8f);
+ policyInfo.setAMRMPolicyWeights(amrmWeights);
+
+ ((WeightedLocalityPolicyManager) wfp).setWeightedPolicyInfo(
+ policyInfo);
+
+ //set expected params that the base test class will use for tests
+ expectedPolicyManager = WeightedLocalityPolicyManager.class;
+ expectedAMRMProxyPolicy = LocalityMulticastAMRMProxyPolicy.class;
+ expectedRouterPolicy = WeightedRandomRouterPolicy.class;
+ }
+
+ @Test
+ public void testPolicyInfoSetCorrectly() throws Exception {
+ serializeAndDeserializePolicyManager(wfp, expectedPolicyManager,
+ expectedAMRMProxyPolicy,
+ expectedRouterPolicy);
+
+ //check the policyInfo propagates through ser/der correctly
+ Assert.assertEquals(((WeightedLocalityPolicyManager) wfp)
+ .getWeightedPolicyInfo(), policyInfo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/112b99b9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
index 87ed8d1..85fdc96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/federation/utils/FederationPoliciesTestUtil.java
@@ -143,7 +143,7 @@ public final class FederationPoliciesTestUtil {
SubClusterInfo> activeSubclusters) throws YarnException {
FederationPolicyInitializationContext context =
new FederationPolicyInitializationContext(null, initResolver(),
- initFacade());
+ initFacade(), SubClusterId.newInstance("homesubcluster"));
initializePolicyContext(context, policy, policyInfo, activeSubclusters);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org