You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2018/01/17 21:58:59 UTC

[05/11] hadoop git commit: YARN-6596. Introduce Placement Constraint Manager module. (Konstantinos Karanasos via asuresh)

YARN-6596. Introduce Placement Constraint Manager module. (Konstantinos Karanasos via asuresh)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f650d39b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f650d39b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f650d39b

Branch: refs/heads/YARN-6592
Commit: f650d39b8a6890b55dd961781cb751e3a7c38c51
Parents: b3dd520
Author: Arun Suresh <as...@apache.org>
Authored: Fri Dec 22 13:26:30 2017 -0800
Committer: Arun Suresh <as...@apache.org>
Committed: Wed Jan 17 13:50:38 2018 -0800

----------------------------------------------------------------------
 .../resourcemanager/RMActiveServiceContext.java |  15 +
 .../yarn/server/resourcemanager/RMContext.java  |   6 +
 .../server/resourcemanager/RMContextImpl.java   |  13 +
 .../server/resourcemanager/ResourceManager.java |  13 +
 .../MemoryPlacementConstraintManager.java       | 282 +++++++++++++++++++
 .../constraint/PlacementConstraintManager.java  | 151 ++++++++++
 .../PlacementConstraintManagerService.java      |  93 ++++++
 .../scheduler/constraint/package-info.java      |  29 ++
 .../TestPlacementConstraintManagerService.java  | 182 ++++++++++++
 9 files changed, 784 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 4d0c230..06a1d00 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -109,6 +110,7 @@ public class RMActiveServiceContext {
   private RMAppLifetimeMonitor rmAppLifetimeMonitor;
   private QueueLimitCalculator queueLimitCalculator;
   private AllocationTagsManager allocationTagsManager;
+  private PlacementConstraintManager placementConstraintManager;
 
   public RMActiveServiceContext() {
     queuePlacementManager = new PlacementManager();
@@ -413,6 +415,19 @@ public class RMActiveServiceContext {
 
   @Private
   @Unstable
+  public PlacementConstraintManager getPlacementConstraintManager() {
+    return placementConstraintManager;
+  }
+
+  @Private
+  @Unstable
+  public void setPlacementConstraintManager(
+      PlacementConstraintManager placementConstraintManager) {
+    this.placementConstraintManager = placementConstraintManager;
+  }
+
+  @Private
+  @Unstable
   public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
     return rmDelegatedNodeLabelsUpdater;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index 00da108..eb91a31 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
@@ -171,4 +172,9 @@ public interface RMContext extends ApplicationMasterServiceContext {
   AllocationTagsManager getAllocationTagsManager();
 
   void setAllocationTagsManager(AllocationTagsManager allocationTagsManager);
+
+  PlacementConstraintManager getPlacementConstraintManager();
+
+  void setPlacementConstraintManager(
+      PlacementConstraintManager placementConstraintManager);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index da50ef8..0b6be72 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
@@ -516,6 +517,18 @@ public class RMContextImpl implements RMContext {
   }
 
   @Override
+  public PlacementConstraintManager getPlacementConstraintManager() {
+    return activeServiceContext.getPlacementConstraintManager();
+  }
+
+  @Override
+  public void setPlacementConstraintManager(
+      PlacementConstraintManager placementConstraintManager) {
+    activeServiceContext
+        .setPlacementConstraintManager(placementConstraintManager);
+  }
+
+  @Override
   public RMDelegatedNodeLabelsUpdater getRMDelegatedNodeLabelsUpdater() {
     return activeServiceContext.getRMDelegatedNodeLabelsUpdater();
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index d71f224..2396b94 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -94,6 +94,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManagerService;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -495,6 +497,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected AllocationTagsManager createAllocationTagsManager() {
     return new AllocationTagsManager(this.rmContext);
   }
+
+  protected PlacementConstraintManagerService
+      createPlacementConstraintManager() {
+    // Use the in memory Placement Constraint Manager.
+    return new MemoryPlacementConstraintManager();
+  }
   
   protected DelegationTokenRenewer createDelegationTokenRenewer() {
     return new DelegationTokenRenewer();
@@ -618,6 +626,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
           createAllocationTagsManager();
       rmContext.setAllocationTagsManager(allocationTagsManager);
 
+      PlacementConstraintManagerService placementConstraintManager =
+          createPlacementConstraintManager();
+      addService(placementConstraintManager);
+      rmContext.setPlacementConstraintManager(placementConstraintManager);
+
       RMDelegatedNodeLabelsUpdater delegatedNodeLabelsUpdater =
           createRMDelegatedNodeLabelsUpdater();
       if (delegatedNodeLabelsUpdater != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
new file mode 100644
index 0000000..ceff6f6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java
@@ -0,0 +1,282 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In memory implementation of the {@link PlacementConstraintManagerService}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class MemoryPlacementConstraintManager
+    extends PlacementConstraintManagerService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MemoryPlacementConstraintManager.class);
+
+  private ReentrantReadWriteLock.ReadLock readLock;
+  private ReentrantReadWriteLock.WriteLock writeLock;
+
+  /**
+   * Stores the global constraints that will be manipulated by the cluster
+   * admin. The key of each entry is the tag that will enable the corresponding
+   * constraint.
+   */
+  private Map<String, PlacementConstraint> globalConstraints;
+  /**
+   * Stores the constraints for each application, along with the allocation tags
+   * that will enable each of the constraints for a given application.
+   */
+  private Map<ApplicationId, Map<String, PlacementConstraint>> appConstraints;
+
+  public MemoryPlacementConstraintManager() {
+    this.globalConstraints = new HashMap<>();
+    this.appConstraints = new HashMap<>();
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void registerApplication(ApplicationId appId,
+      Map<Set<String>, PlacementConstraint> constraintMap) {
+    // Check if app already exists. If not, prepare its constraint map.
+    Map<String, PlacementConstraint> constraintsForApp = new HashMap<>();
+    try {
+      readLock.lock();
+      if (appConstraints.get(appId) != null) {
+        LOG.warn("Application {} has already been registered.", appId);
+        return;
+      }
+      // Go over each sourceTag-constraint pair, validate it, and add it to the
+      // constraint map for this app.
+      for (Map.Entry<Set<String>, PlacementConstraint> entry : constraintMap
+          .entrySet()) {
+        Set<String> sourceTags = entry.getKey();
+        PlacementConstraint constraint = entry.getValue();
+        if (validateConstraint(sourceTags, constraint)) {
+          String sourceTag = getValidSourceTag(sourceTags);
+          constraintsForApp.put(sourceTag, constraint);
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+
+    if (constraintsForApp.isEmpty()) {
+      LOG.info("Application {} was registered, but no constraints were added.",
+          appId);
+    }
+    // Update appConstraints.
+    try {
+      writeLock.lock();
+      appConstraints.put(appId, constraintsForApp);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void addConstraint(ApplicationId appId, Set<String> sourceTags,
+      PlacementConstraint placementConstraint, boolean replace) {
+    try {
+      writeLock.lock();
+      Map<String, PlacementConstraint> constraintsForApp =
+          appConstraints.get(appId);
+      if (constraintsForApp == null) {
+        LOG.info("Cannot add constraint to application {}, as it has not "
+            + "been registered yet.", appId);
+        return;
+      }
+
+      addConstraintToMap(constraintsForApp, sourceTags, placementConstraint,
+          replace);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void addGlobalConstraint(Set<String> sourceTags,
+      PlacementConstraint placementConstraint, boolean replace) {
+    try {
+      writeLock.lock();
+      addConstraintToMap(globalConstraints, sourceTags, placementConstraint,
+          replace);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Helper method that adds a constraint to a map for a given source tag.
+   * Assumes there is already a lock on the constraint map.
+   *
+   * @param constraintMap constraint map to which the constraint will be added
+   * @param sourceTags the source tags that will enable this constraint
+   * @param placementConstraint the new constraint to be added
+   * @param replace if true, an existing constraint for these sourceTags will be
+   *          replaced with the new one
+   */
+  private void addConstraintToMap(
+      Map<String, PlacementConstraint> constraintMap, Set<String> sourceTags,
+      PlacementConstraint placementConstraint, boolean replace) {
+    if (validateConstraint(sourceTags, placementConstraint)) {
+      String sourceTag = getValidSourceTag(sourceTags);
+      if (constraintMap.get(sourceTag) == null || replace) {
+        if (replace) {
+          LOG.info("Replacing the constraint associated with tag {} with {}.",
+              sourceTag, placementConstraint);
+        }
+        constraintMap.put(sourceTag, placementConstraint);
+      } else {
+        LOG.info("Constraint {} will not be added. There is already a "
+                + "constraint associated with tag {}.",
+            placementConstraint, sourceTag);
+      }
+    }
+  }
+
+  @Override
+  public Map<Set<String>, PlacementConstraint> getConstraints(
+      ApplicationId appId) {
+    try {
+      readLock.lock();
+      if (appConstraints.get(appId) == null) {
+        LOG.info("Application {} is not registered in the Placement "
+            + "Constraint Manager.", appId);
+        return null;
+      }
+
+      // Copy to a new map and return an unmodifiable version of it.
+      // Each key of the map is a set with a single source tag.
+      Map<Set<String>, PlacementConstraint> constraintMap =
+          appConstraints.get(appId).entrySet().stream()
+              .collect(Collectors.toMap(
+                  e -> Stream.of(e.getKey()).collect(Collectors.toSet()),
+                  e -> e.getValue()));
+
+      return Collections.unmodifiableMap(constraintMap);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public PlacementConstraint getConstraint(ApplicationId appId,
+      Set<String> sourceTags) {
+    if (!validateSourceTags(sourceTags)) {
+      return null;
+    }
+    String sourceTag = getValidSourceTag(sourceTags);
+    try {
+      readLock.lock();
+      if (appConstraints.get(appId) == null) {
+        LOG.info("Application {} is not registered in the Placement "
+            + "Constraint Manager.", appId);
+        return null;
+      }
+      // TODO: Merge this constraint with the global one for this tag, if one
+      // exists.
+      return appConstraints.get(appId).get(sourceTag);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public PlacementConstraint getGlobalConstraint(Set<String> sourceTags) {
+    if (!validateSourceTags(sourceTags)) {
+      return null;
+    }
+    String sourceTag = getValidSourceTag(sourceTags);
+    try {
+      readLock.lock();
+      return globalConstraints.get(sourceTag);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void unregisterApplication(ApplicationId appId) {
+    try {
+      writeLock.lock();
+      appConstraints.remove(appId);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void removeGlobalConstraint(Set<String> sourceTags) {
+    if (!validateSourceTags(sourceTags)) {
+      return;
+    }
+    String sourceTag = getValidSourceTag(sourceTags);
+    try {
+      writeLock.lock();
+      globalConstraints.remove(sourceTag);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  @Override
+  public int getNumRegisteredApplications() {
+    try {
+      readLock.lock();
+      return appConstraints.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public int getNumGlobalConstraints() {
+    try {
+      readLock.lock();
+      return globalConstraints.size();
+    } finally {
+      readLock.unlock();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
new file mode 100644
index 0000000..7725d0d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java
@@ -0,0 +1,151 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+
+/**
+ * Interface for storing and retrieving placement constraints (see
+ * {@link PlacementConstraint}).
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface PlacementConstraintManager {
+
+  /**
+   * Register all placement constraints of an application.
+   *
+   * @param appId the application ID
+   * @param constraintMap the map of allocation tags to constraints for this
+   *          application
+   */
+  void registerApplication(ApplicationId appId,
+      Map<Set<String>, PlacementConstraint> constraintMap);
+
+  /**
+   * Add a placement constraint for a given application and a given set of
+   * (source) allocation tags. The constraint will be used on Scheduling
+   * Requests that carry this set of allocation tags.
+   * TODO: Support merge and not only replace when adding a constraint.
+   *
+   * @param appId the application ID
+   * @param sourceTags the set of allocation tags that will enable this
+   *          constraint
+   * @param placementConstraint the constraint
+   * @param replace if true, an existing constraint for these tags will be
+   *          replaced by the given one
+   */
+  void addConstraint(ApplicationId appId, Set<String> sourceTags,
+      PlacementConstraint placementConstraint, boolean replace);
+
+  /**
+   * Add a placement constraint that will be used globally. These constraints
+   * are added by the cluster administrator.
+   * TODO: Support merge and not only replace when adding a constraint.
+   *
+   * @param sourceTags the allocation tags that will enable this constraint
+   * @param placementConstraint the constraint
+   * @param replace if true, an existing constraint for these tags will be
+   *          replaced by the given one
+   */
+  void addGlobalConstraint(Set<String> sourceTags,
+      PlacementConstraint placementConstraint, boolean replace);
+
+  /**
+   * Retrieve all constraints for a given application, along with the allocation
+   * tags that enable each constraint.
+   *
+   * @param appId the application ID
+   * @return the constraints for this application with the associated tags
+   */
+  Map<Set<String>, PlacementConstraint> getConstraints(ApplicationId appId);
+
+  /**
+   * Retrieve the placement constraint that is associated with a set of
+   * allocation tags for a given application.
+   *
+   * @param appId the application ID
+   * @param sourceTags the allocation tags that enable this constraint
+   * @return the constraint
+   */
+  PlacementConstraint getConstraint(ApplicationId appId,
+      Set<String> sourceTags);
+
+  /**
+   * Retrieve a global constraint that is associated with a given set of
+   * allocation tags.
+   *
+   * @param sourceTags the allocation tags that enable this constraint
+   * @return the constraint
+   */
+  PlacementConstraint getGlobalConstraint(Set<String> sourceTags);
+
+  /**
+   * Remove the constraints that correspond to a given application.
+   *
+   * @param appId the application that will be removed.
+   */
+  void unregisterApplication(ApplicationId appId);
+
+  /**
+   * Remove a global constraint that is associated with the given allocation
+   * tags.
+   *
+   * @param sourceTags the allocation tags
+   */
+  void removeGlobalConstraint(Set<String> sourceTags);
+
+  /**
+   * Returns the number of currently registered applications in the Placement
+   * Constraint Manager.
+   *
+   * @return number of registered applications.
+   */
+  int getNumRegisteredApplications();
+
+  /**
+   * Returns the number of global constraints registered in the Placement
+   * Constraint Manager.
+   *
+   * @return number of global constraints.
+   */
+  int getNumGlobalConstraints();
+
+  /**
+   * Validate a placement constraint and the set of allocation tags that will
+   * enable it.
+   *
+   * @param sourceTags the associated allocation tags
+   * @param placementConstraint the constraint
+   * @return true if constraint and tags are valid
+   */
+  default boolean validateConstraint(Set<String> sourceTags,
+      PlacementConstraint placementConstraint) {
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
new file mode 100644
index 0000000..967f251
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManagerService.java
@@ -0,0 +1,93 @@
+/*
+ * *
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ * /
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+
+/**
+ * The service that implements the {@link PlacementConstraintManager} interface.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class PlacementConstraintManagerService extends AbstractService
+    implements PlacementConstraintManager {
+
+  protected static final Log LOG =
+      LogFactory.getLog(PlacementConstraintManagerService.class);
+
+  private PlacementConstraintManager placementConstraintManager = null;
+
+  public PlacementConstraintManagerService() {
+    super(PlacementConstraintManagerService.class.getName());
+  }
+
+  @Override
+  public boolean validateConstraint(Set<String> sourceTags,
+      PlacementConstraint placementConstraint) {
+    if (!validateSourceTags(sourceTags)) {
+      return false;
+    }
+    // TODO: Perform actual validation of the constraint (in YARN-6621).
+    // TODO: Perform satisfiability check for constraint.
+    return true;
+  }
+
+  /**
+   * Validates whether the allocation tags that will enable a constraint have
+   * the expected format. At the moment we support a single allocation tag per
+   * constraint.
+   *
+   * @param sourceTags the source allocation tags
+   * @return true if the tags have the expected format
+   */
+  protected boolean validateSourceTags(Set<String> sourceTags) {
+    if (sourceTags.isEmpty()) {
+      LOG.warn("A placement constraint cannot be associated with an empty "
+          + "set of tags.");
+      return false;
+    }
+    if (sourceTags.size() > 1) {
+      LOG.warn("Only a single tag can be associated with a placement "
+          + "constraint currently.");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * This method will return a single allocation tag. It should be called after
+   * validating the tags by calling {@link #validateSourceTags}.
+   *
+   * @param sourceTags the source allocation tags
+   * @return the single source tag
+   */
+  protected String getValidSourceTag(Set<String> sourceTags) {
+    return sourceTags.iterator().next();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
new file mode 100644
index 0000000..cbb7a55
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement
+ * contains classes related to scheduling containers using placement
+ * constraints.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f650d39b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
new file mode 100644
index 0000000..abcab1a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
+
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
+import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.nodeAttribute;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
+import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Unit tests for {@link PlacementConstraintManagerService}.
+ */
+public class TestPlacementConstraintManagerService {
+
+  private PlacementConstraintManagerService pcm;
+
+  protected PlacementConstraintManagerService createPCM() {
+    return new MemoryPlacementConstraintManager();
+  }
+
+  private ApplicationId appId1, appId2;
+  private PlacementConstraint c1, c2, c3, c4;
+  private Set<String> sourceTag1, sourceTag2, sourceTag3, sourceTag4;
+  private Map<Set<String>, PlacementConstraint> constraintMap1, constraintMap2;
+
+  @Before
+  public void before() {
+    this.pcm = createPCM();
+
+    // Build appIDs, constraints, source tags, and constraint map.
+    long ts = System.currentTimeMillis();
+    appId1 = BuilderUtils.newApplicationId(ts, 123);
+    appId2 = BuilderUtils.newApplicationId(ts, 234);
+
+    c1 = PlacementConstraints.build(targetIn(NODE, allocationTag("hbase-m")));
+    c2 = PlacementConstraints.build(targetIn(RACK, allocationTag("hbase-rs")));
+    c3 = PlacementConstraints
+        .build(targetNotIn(NODE, nodeAttribute("java", "1.8")));
+    c4 = PlacementConstraints
+        .build(targetCardinality(RACK, 2, 10, allocationTag("zk")));
+
+    sourceTag1 = new HashSet<>(Arrays.asList("spark"));
+    sourceTag2 = new HashSet<>(Arrays.asList("zk"));
+    sourceTag3 = new HashSet<>(Arrays.asList("storm"));
+    sourceTag4 = new HashSet<>(Arrays.asList("hbase-m", "hbase-sec"));
+
+    constraintMap1 = Stream
+        .of(new SimpleEntry<>(sourceTag1, c1),
+            new SimpleEntry<>(sourceTag2, c2))
+        .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+
+    constraintMap2 = Stream.of(new SimpleEntry<>(sourceTag3, c4))
+        .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+  }
+
+  @Test
+  public void testRegisterUnregisterApps() {
+    Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+
+    // Register two applications.
+    pcm.registerApplication(appId1, constraintMap1);
+    Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+    Map<Set<String>, PlacementConstraint> constrMap =
+        pcm.getConstraints(appId1);
+    Assert.assertNotNull(constrMap);
+    Assert.assertEquals(2, constrMap.size());
+    Assert.assertNotNull(constrMap.get(sourceTag1));
+    Assert.assertNotNull(constrMap.get(sourceTag2));
+
+    pcm.registerApplication(appId2, constraintMap2);
+    Assert.assertEquals(2, pcm.getNumRegisteredApplications());
+    constrMap = pcm.getConstraints(appId2);
+    Assert.assertNotNull(constrMap);
+    Assert.assertEquals(1, constrMap.size());
+    Assert.assertNotNull(constrMap.get(sourceTag3));
+    Assert.assertNull(constrMap.get(sourceTag2));
+
+    // Try to register the same app again.
+    pcm.registerApplication(appId2, constraintMap1);
+    Assert.assertEquals(2, pcm.getNumRegisteredApplications());
+
+    // Unregister appId1.
+    pcm.unregisterApplication(appId1);
+    Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+    Assert.assertNull(pcm.getConstraints(appId1));
+    Assert.assertNotNull(pcm.getConstraints(appId2));
+  }
+
+  @Test
+  public void testAddConstraint() {
+    // Cannot add constraint to unregistered app.
+    Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+    pcm.addConstraint(appId1, sourceTag1, c1, false);
+    Assert.assertEquals(0, pcm.getNumRegisteredApplications());
+
+    // Register application.
+    pcm.registerApplication(appId1, new HashMap<>());
+    Assert.assertEquals(1, pcm.getNumRegisteredApplications());
+    Assert.assertEquals(0, pcm.getConstraints(appId1).size());
+
+    // Add two constraints.
+    pcm.addConstraint(appId1, sourceTag1, c1, false);
+    pcm.addConstraint(appId1, sourceTag2, c3, false);
+    Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+
+    // Constraint for sourceTag1 should not be replaced.
+    pcm.addConstraint(appId1, sourceTag1, c2, false);
+    Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+    Assert.assertEquals(c1, pcm.getConstraint(appId1, sourceTag1));
+    Assert.assertNotEquals(c2, pcm.getConstraint(appId1, sourceTag1));
+
+    // Now c2 should replace c1 for sourceTag1.
+    pcm.addConstraint(appId1, sourceTag1, c2, true);
+    Assert.assertEquals(2, pcm.getConstraints(appId1).size());
+    Assert.assertEquals(c2, pcm.getConstraint(appId1, sourceTag1));
+  }
+
+  @Test
+  public void testGlobalConstraints() {
+    Assert.assertEquals(0, pcm.getNumGlobalConstraints());
+    pcm.addGlobalConstraint(sourceTag1, c1, false);
+    Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+    Assert.assertNotNull(pcm.getGlobalConstraint(sourceTag1));
+
+    // Constraint for sourceTag1 should not be replaced.
+    pcm.addGlobalConstraint(sourceTag1, c2, false);
+    Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+    Assert.assertEquals(c1, pcm.getGlobalConstraint(sourceTag1));
+    Assert.assertNotEquals(c2, pcm.getGlobalConstraint(sourceTag1));
+
+    // Now c2 should replace c1 for sourceTag1.
+    pcm.addGlobalConstraint(sourceTag1, c2, true);
+    Assert.assertEquals(1, pcm.getNumGlobalConstraints());
+    Assert.assertEquals(c2, pcm.getGlobalConstraint(sourceTag1));
+
+    pcm.removeGlobalConstraint(sourceTag1);
+    Assert.assertEquals(0, pcm.getNumGlobalConstraints());
+  }
+
+  @Test
+  public void testValidateConstraint() {
+    // At the moment we only disallow multiple source tags to be associated with
+    // a constraint. TODO: More tests to be added for YARN-6621.
+    Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1));
+    Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org