You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/31 15:57:38 UTC
[14/32] hadoop git commit: YARN-6596. Introduce Placement Constraint
Manager module. (Konstantinos Karanasos via asuresh)
YARN-6596. Introduce Placement Constraint Manager module. (Konstantinos Karanasos via asuresh)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1efb2b6f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1efb2b6f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1efb2b6f
Branch: refs/heads/trunk
Commit: 1efb2b6f250022f41fe5911c1bb3028ec15c5447
Parents: 37f1a7b
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 22 13:26:30 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 31 01:30:17 2018 -0800
----------------------------------------------------------------------
.../resourcemanager/RMActiveServiceContext.java | 15 +
.../yarn/server/resourcemanager/RMContext.java | 6 +
.../server/resourcemanager/RMContextImpl.java | 13 +
.../server/resourcemanager/ResourceManager.java | 13 +
.../MemoryPlacementConstraintManager.java | 282 +++++++++++++++++++
.../constraint/PlacementConstraintManager.java | 151 ++++++++++
.../PlacementConstraintManagerService.java | 93 ++++++
.../scheduler/constraint/package-info.java | 29 ++
.../TestPlacementConstraintManagerService.java | 182 ++++++++++++
9 files changed, 784 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 4d0c230..06a1d00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -109,6 +110,7 @@ public class RMActiveServiceContext {
private RMAppLifetimeMonitor rmAppLifetimeMonitor;
private QueueLimitCalculator queueLimitCalculator;
private AllocationTagsManager allocationTagsManager;
+ private PlacementConstraintManager placementConstraintManager;
public RMActiveServiceContext() {
queuePlacementManager = new PlacementManager();
@@ -413,6 +415,19 @@ public class RMActiveServiceContext {
@Private
@Unstable
+ public PlacementConstraintManager getPlacementConstraintManager() {
+ return placementConstraintManager;
+ }
+
+ @Private
+ @Unstable
+ public void setPlacementConstraintManager(
+ PlacementConstraintManager placementConstraintManager) {
+ this.placementConstraintManager = placementConstraintManager;
+ }
+
+ @Private
+ @Unstable
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return rmDelegatedNodeLabelsUpdater;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 00da108..eb91a31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -171,4 +172,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
AllocationTagsManager getAllocationTagsManager();
void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
+
+ PlacementConstraintManager getPlacementConstraintManager();
+
+ void setPlacementConstraintManager(
+ PlacementConstraintManager placementConstraintManager);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index da50ef8..0b6be72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -516,6 +517,18 @@ public class RMContextImpl implements RMContext {
}
@Override
+ public PlacementConstraintManager getPlacementConstraintManager() {
+ return activeServiceContext.getPlacementConstraintManager();
+ }
+
+ @Override
+ public void setPlacementConstraintManager(
+ PlacementConstraintManager placementConstraintManager) {
+ activeServiceContext
+ .setPlacementConstraintManager(placementConstraintManager);
+ }
+
+ @Override
public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index 1d838f0..5140c9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -97,6 +97,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -498,6 +500,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
protected AllocationTagsManager createAllocationTagsManager() {
return new AllocationTagsManager(this.rmContext);
}
+
+ protected PlacementConstraintManagerService
+ createPlacementConstraintManager() {
+ // Use the in memory Placement Constraint Manager.
+ return new MemoryPlacementConstraintManager();
+ }
protected DelegationTokenRenewer createDelegationTokenRenewer() {
return new DelegationTokenRenewer();
@@ -628,6 +636,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
createAllocationTagsManager();
rmContext.setAllocationTagsManager(allocationTagsManager);
+ PlacementConstraintManagerService placementConstraintManager =
+ createPlacementConstraintManager();
+ addService(placementConstraintManager);
+ rmContext.setPlacementConstraintManager(placementConstraintManager);
+
RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
createRMDelegatedNodeLabelsUpdater();
if (delegatedNodeLabelsUpdater != null) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
new file mode 100644
index 0000000..ceff6f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
@@ -0,0 +1,282 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In memory implementation of the {@link PlacementConstraintManagerService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MemoryPlacementConstraintManager
+ extends PlacementConstraintManagerService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MemoryPlacementConstraintManager.class);
+
+ private ReentrantReadWriteLock.ReadLock readLock;
+ private ReentrantReadWriteLock.WriteLock writeLock;
+
+ /**
+ * Stores the global constraints that will be manipulated by the cluster
+ * admin. The key of each entry is the tag that will enable the corresponding
+ * constraint.
+ */
+ private Map<String, PlacementConstraint> globalConstraints;
+ /**
+ * Stores the constraints for each application, along with the allocation tags
+ * that will enable each of the constraints for a given application.
+ */
+ private Map<ApplicationId, Map<String, PlacementConstraint>> appConstraints;
+
+ public MemoryPlacementConstraintManager() {
+ this.globalConstraints = new HashMap<>();
+ this.appConstraints = new HashMap<>();
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ }
+
+ @Override
+ protected void serviceInit(Configuration conf) throws Exception {
+ super.serviceInit(conf);
+ }
+
+ @Override
+ public void registerApplication(ApplicationId appId,
+ Map<Set<String>, PlacementConstraint> constraintMap) {
+ // Check if app already exists. If not, prepare its constraint map.
+ Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
+ try {
+ readLock.lock();
+ if (appConstraints.get(appId) != null) {
+ LOG.warn("Application {} has already been registered.", appId);
+ return;
+ }
+ // Go over each sourceTag-constraint pair, validate it, and add it to the
+ // constraint map for this app.
+ for (Map.Entry<Set<String>, PlacementConstraint> entry : constraintMap
+ .entrySet()) {
+ Set<String> sourceTags = entry.getKey();
+ PlacementConstraint constraint = entry.getValue();
+ if (validateConstraint(sourceTags, constraint)) {
+ String sourceTag = getValidSourceTag(sourceTags);
+ constraintsForApp.put(sourceTag, constraint);
+ }
+ }
+ } finally {
+ readLock.unlock();
+ }
+
+ if (constraintsForApp.isEmpty()) {
+ LOG.info("Application {} was registered, but no constraints were added.",
+ appId);
+ }
+ // Update appConstraints.
+ try {
+ writeLock.lock();
+ appConstraints.put(appId, constraintsForApp);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addConstraint(ApplicationId appId, Set<String> sourceTags,
+ PlacementConstraint placementConstraint, boolean replace) {
+ try {
+ writeLock.lock();
+ Map<String, PlacementConstraint> constraintsForApp =
+ appConstraints.get(appId);
+ if (constraintsForApp == null) {
+ LOG.info("Cannot add constraint to application {}, as it has not "
+ + "been registered yet.", appId);
+ return;
+ }
+
+ addConstraintToMap(constraintsForApp, sourceTags, placementConstraint,
+ replace);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void addGlobalConstraint(Set<String> sourceTags,
+ PlacementConstraint placementConstraint, boolean replace) {
+ try {
+ writeLock.lock();
+ addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
+ replace);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Helper method that adds a constraint to a map for a given source tag.
+ * Assumes there is already a lock on the constraint map.
+ *
+ * @param constraintMap constraint map to which the constraint will be added
+ * @param sourceTags the source tags that will enable this constraint
+ * @param placementConstraint the new constraint to be added
+ * @param replace if true, an existing constraint for these sourceTags will be
+ * replaced with the new one
+ */
+ private void addConstraintToMap(
+ Map<String, PlacementConstraint> constraintMap, Set<String> sourceTags,
+ PlacementConstraint placementConstraint, boolean replace) {
+ if (validateConstraint(sourceTags, placementConstraint)) {
+ String sourceTag = getValidSourceTag(sourceTags);
+ if (constraintMap.get(sourceTag) == null || replace) {
+ if (replace) {
+ LOG.info("Replacing the constraint associated with tag {} with {}.",
+ sourceTag, placementConstraint);
+ }
+ constraintMap.put(sourceTag, placementConstraint);
+ } else {
+ LOG.info("Constraint {} will not be added. There is already a "
+ + "constraint associated with tag {}.",
+ placementConstraint, sourceTag);
+ }
+ }
+ }
+
+ @Override
+ public Map<Set<String>, PlacementConstraint> getConstraints(
+ ApplicationId appId) {
+ try {
+ readLock.lock();
+ if (appConstraints.get(appId) == null) {
+ LOG.info("Application {} is not registered in the Placement "
+ + "Constraint Manager.", appId);
+ return null;
+ }
+
+ // Copy to a new map and return an unmodifiable version of it.
+ // Each key of the map is a set with a single source tag.
+ Map<Set<String>, PlacementConstraint> constraintMap =
+ appConstraints.get(appId).entrySet().stream()
+ .collect(Collectors.toMap(
+ e -> Stream.of(e.getKey()).collect(Collectors.toSet()),
+ e -> e.getValue()));
+
+ return Collections.unmodifiableMap(constraintMap);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public PlacementConstraint getConstraint(ApplicationId appId,
+ Set<String> sourceTags) {
+ if (!validateSourceTags(sourceTags)) {
+ return null;
+ }
+ String sourceTag = getValidSourceTag(sourceTags);
+ try {
+ readLock.lock();
+ if (appConstraints.get(appId) == null) {
+ LOG.info("Application {} is not registered in the Placement "
+ + "Constraint Manager.", appId);
+ return null;
+ }
+ // TODO: Merge this constraint with the global one for this tag, if one
+ // exists.
+ return appConstraints.get(appId).get(sourceTag);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public PlacementConstraint getGlobalConstraint(Set<String> sourceTags) {
+ if (!validateSourceTags(sourceTags)) {
+ return null;
+ }
+ String sourceTag = getValidSourceTag(sourceTags);
+ try {
+ readLock.lock();
+ return globalConstraints.get(sourceTag);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void unregisterApplication(ApplicationId appId) {
+ try {
+ writeLock.lock();
+ appConstraints.remove(appId);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeGlobalConstraint(Set<String> sourceTags) {
+ if (!validateSourceTags(sourceTags)) {
+ return;
+ }
+ String sourceTag = getValidSourceTag(sourceTags);
+ try {
+ writeLock.lock();
+ globalConstraints.remove(sourceTag);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public int getNumRegisteredApplications() {
+ try {
+ readLock.lock();
+ return appConstraints.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public int getNumGlobalConstraints() {
+ try {
+ readLock.lock();
+ return globalConstraints.size();
+ } finally {
+ readLock.unlock();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
new file mode 100644
index 0000000..7725d0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
@@ -0,0 +1,151 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+
+/**
+ * Interface for storing and retrieving placement constraints (see
+ * {@link PlacementConstraint}).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface PlacementConstraintManager {
+
+ /**
+ * Register all placement constraints of an application.
+ *
+ * @param appId the application ID
+ * @param constraintMap the map of allocation tags to constraints for this
+ * application
+ */
+ void registerApplication(ApplicationId appId,
+ Map<Set<String>, PlacementConstraint> constraintMap);
+
+ /**
+ * Add a placement constraint for a given application and a given set of
+ * (source) allocation tags. The constraint will be used on Scheduling
+ * Requests that carry this set of allocation tags.
+ * TODO: Support merge and not only replace when adding a constraint.
+ *
+ * @param appId the application ID
+ * @param sourceTags the set of allocation tags that will enable this
+ * constraint
+ * @param placementConstraint the constraint
+ * @param replace if true, an existing constraint for these tags will be
+ * replaced by the given one
+ */
+ void addConstraint(ApplicationId appId, Set<String> sourceTags,
+ PlacementConstraint placementConstraint, boolean replace);
+
+ /**
+ * Add a placement constraint that will be used globally. These constraints
+ * are added by the cluster administrator.
+ * TODO: Support merge and not only replace when adding a constraint.
+ *
+ * @param sourceTags the allocation tags that will enable this constraint
+ * @param placementConstraint the constraint
+ * @param replace if true, an existing constraint for these tags will be
+ * replaced by the given one
+ */
+ void addGlobalConstraint(Set<String> sourceTags,
+ PlacementConstraint placementConstraint, boolean replace);
+
+ /**
+ * Retrieve all constraints for a given application, along with the allocation
+ * tags that enable each constraint.
+ *
+ * @param appId the application ID
+ * @return the constraints for this application with the associated tags
+ */
+ Map<Set<String>, PlacementConstraint> getConstraints(ApplicationId appId);
+
+ /**
+ * Retrieve the placement constraint that is associated with a set of
+ * allocation tags for a given application.
+ *
+ * @param appId the application ID
+ * @param sourceTags the allocation tags that enable this constraint
+ * @return the constraint
+ */
+ PlacementConstraint getConstraint(ApplicationId appId,
+ Set<String> sourceTags);
+
+ /**
+ * Retrieve a global constraint that is associated with a given set of
+ * allocation tags.
+ *
+ * @param sourceTags the allocation tags that enable this constraint
+ * @return the constraint
+ */
+ PlacementConstraint getGlobalConstraint(Set<String> sourceTags);
+
+ /**
+ * Remove the constraints that correspond to a given application.
+ *
+ * @param appId the application that will be removed.
+ */
+ void unregisterApplication(ApplicationId appId);
+
+ /**
+ * Remove a global constraint that is associated with the given allocation
+ * tags.
+ *
+ * @param sourceTags the allocation tags
+ */
+ void removeGlobalConstraint(Set<String> sourceTags);
+
+ /**
+ * Returns the number of currently registered applications in the Placement
+ * Constraint Manager.
+ *
+ * @return number of registered applications.
+ */
+ int getNumRegisteredApplications();
+
+ /**
+ * Returns the number of global constraints registered in the Placement
+ * Constraint Manager.
+ *
+ * @return number of global constraints.
+ */
+ int getNumGlobalConstraints();
+
+ /**
+ * Validate a placement constraint and the set of allocation tags that will
+ * enable it.
+ *
+ * @param sourceTags the associated allocation tags
+ * @param placementConstraint the constraint
+ * @return true if constraint and tags are valid
+ */
+ default boolean validateConstraint(Set<String> sourceTags,
+ PlacementConstraint placementConstraint) {
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
new file mode 100644
index 0000000..967f251
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
@@ -0,0 +1,93 @@
+/*
+ * *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+
+/**
+ * The service that implements the {@link PlacementConstraintManager} interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class PlacementConstraintManagerService extends AbstractService
+ implements PlacementConstraintManager {
+
+ protected static final Log LOG =
+ LogFactory.getLog(PlacementConstraintManagerService.class);
+
+ private PlacementConstraintManager placementConstraintManager = null;
+
+ public PlacementConstraintManagerService() {
+ super(PlacementConstraintManagerService.class.getName());
+ }
+
+ @Override
+ public boolean validateConstraint(Set<String> sourceTags,
+ PlacementConstraint placementConstraint) {
+ if (!validateSourceTags(sourceTags)) {
+ return false;
+ }
+ // TODO: Perform actual validation of the constraint (in YARN-6621).
+ // TODO: Perform satisfiability check for constraint.
+ return true;
+ }
+
+ /**
+ * Validates whether the allocation tags that will enable a constraint have
+ * the expected format. At the moment we support a single allocation tag per
+ * constraint.
+ *
+ * @param sourceTags the source allocation tags
+ * @return true if the tags have the expected format
+ */
+ protected boolean validateSourceTags(Set<String> sourceTags) {
+ if (sourceTags.isEmpty()) {
+ LOG.warn("A placement constraint cannot be associated with an empty "
+ + "set of tags.");
+ return false;
+ }
+ if (sourceTags.size() > 1) {
+ LOG.warn("Only a single tag can be associated with a placement "
+ + "constraint currently.");
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * This method will return a single allocation tag. It should be called after
+ * validating the tags by calling {@link #validateSourceTags}.
+ *
+ * @param sourceTags the source allocation tags
+ * @return the single source tag
+ */
+ protected String getValidSourceTag(Set<String> sourceTags) {
+ return sourceTags.iterator().next();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
new file mode 100644
index 0000000..cbb7a55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1efb2b6f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
new file mode 100644
index 0000000..abcab1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.nodeAttribute;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link PlacementConstraintManagerService}.
+ */
+public class TestPlacementConstraintManagerService {
+
+ private PlacementConstraintManagerService pcm;
+
+ protected PlacementConstraintManagerService createPCM() {
+ return new MemoryPlacementConstraintManager();
+ }
+
+ private ApplicationId appId1, appId2;
+ private PlacementConstraint c1, c2, c3, c4;
+ private Set<String> sourceTag1, sourceTag2, sourceTag3, sourceTag4;
+ private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
+
+ @Before
+ public void before() {
+ this.pcm = createPCM();
+
+ // Build appIDs, constraints, source tags, and constraint map.
+ long ts = System.currentTimeMillis();
+ appId1 = BuilderUtils.newApplicationId(ts, 123);
+ appId2 = BuilderUtils.newApplicationId(ts, 234);
+
+ c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
+ c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
+ c3 = PlacementConstraints
+ .build(targetNotIn(NODE, nodeAttribute("java", "1.8")));
+ c4 = PlacementConstraints
+ .build(targetCardinality(RACK, 2, 10, allocationTag("zk")));
+
+ sourceTag1 = new HashSet<>(Arrays.asList("spark"));
+ sourceTag2 = new HashSet<>(Arrays.asList("zk"));
+ sourceTag3 = new HashSet<>(Arrays.asList("storm"));
+ sourceTag4 = new HashSet<>(Arrays.asList("hbase-m", "hbase-sec"));
+
+ constraintMap1 = Stream
+ .of(new SimpleEntry<>(sourceTag1, c1),
+ new SimpleEntry<>(sourceTag2, c2))
+ .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+
+ constraintMap2 = Stream.of(new SimpleEntry<>(sourceTag3, c4))
+ .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+ }
+
+ @Test
+ public void testRegisterUnregisterApps() {
+ Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+
+ // Register two applications.
+ pcm.registerApplication(appId1, constraintMap1);
+ Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+ Map<Set<String>, PlacementConstraint> constrMap =
+ pcm.getConstraints(appId1);
+ Assert.assertNotNull(constrMap);
+ Assert.assertEquals(2, constrMap.size());
+ Assert.assertNotNull(constrMap.get(sourceTag1));
+ Assert.assertNotNull(constrMap.get(sourceTag2));
+
+ pcm.registerApplication(appId2, constraintMap2);
+ Assert.assertEquals(2, pcm.getNumRegisteredApplications());
+ constrMap = pcm.getConstraints(appId2);
+ Assert.assertNotNull(constrMap);
+ Assert.assertEquals(1, constrMap.size());
+ Assert.assertNotNull(constrMap.get(sourceTag3));
+ Assert.assertNull(constrMap.get(sourceTag2));
+
+ // Try to register the same app again.
+ pcm.registerApplication(appId2, constraintMap1);
+ Assert.assertEquals(2, pcm.getNumRegisteredApplications());
+
+ // Unregister appId1.
+ pcm.unregisterApplication(appId1);
+ Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+ Assert.assertNull(pcm.getConstraints(appId1));
+ Assert.assertNotNull(pcm.getConstraints(appId2));
+ }
+
+ @Test
+ public void testAddConstraint() {
+ // Cannot add constraint to unregistered app.
+ Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+ pcm.addConstraint(appId1, sourceTag1, c1, false);
+ Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+
+ // Register application.
+ pcm.registerApplication(appId1, new HashMap<>());
+ Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+ Assert.assertEquals(0, pcm.getConstraints(appId1).size());
+
+ // Add two constraints.
+ pcm.addConstraint(appId1, sourceTag1, c1, false);
+ pcm.addConstraint(appId1, sourceTag2, c3, false);
+ Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+
+ // Constraint for sourceTag1 should not be replaced.
+ pcm.addConstraint(appId1, sourceTag1, c2, false);
+ Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+ Assert.assertEquals(c1, pcm.getConstraint(appId1, sourceTag1));
+ Assert.assertNotEquals(c2, pcm.getConstraint(appId1, sourceTag1));
+
+ // Now c2 should replace c1 for sourceTag1.
+ pcm.addConstraint(appId1, sourceTag1, c2, true);
+ Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+ Assert.assertEquals(c2, pcm.getConstraint(appId1, sourceTag1));
+ }
+
+ @Test
+ public void testGlobalConstraints() {
+ Assert.assertEquals(0, pcm.getNumGlobalConstraints());
+ pcm.addGlobalConstraint(sourceTag1, c1, false);
+ Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+ Assert.assertNotNull(pcm.getGlobalConstraint(sourceTag1));
+
+ // Constraint for sourceTag1 should not be replaced.
+ pcm.addGlobalConstraint(sourceTag1, c2, false);
+ Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+ Assert.assertEquals(c1, pcm.getGlobalConstraint(sourceTag1));
+ Assert.assertNotEquals(c2, pcm.getGlobalConstraint(sourceTag1));
+
+ // Now c2 should replace c1 for sourceTag1.
+ pcm.addGlobalConstraint(sourceTag1, c2, true);
+ Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+ Assert.assertEquals(c2, pcm.getGlobalConstraint(sourceTag1));
+
+ pcm.removeGlobalConstraint(sourceTag1);
+ Assert.assertEquals(0, pcm.getNumGlobalConstraints());
+ }
+
+ @Test
+ public void testValidateConstraint() {
+ // At the moment we only disallow multiple source tags to be associated with
+ // a constraint. TODO: More tests to be added for YARN-6621.
+ Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1));
+ Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org