You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2018/05/25 16:07:55 UTC

hadoop git commit: YARN-8292: Fix the dominant resource preemption cannot happen when some of the resource vector becomes negative. Contributed by Wangda Tan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk bddfe796f -> 8d5509c68


YARN-8292: Fix the dominant resource preemption cannot happen when some of the resource vector becomes negative. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d5509c6
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d5509c6
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d5509c6

Branch: refs/heads/trunk
Commit: 8d5509c68156faaa6641f4e747fc9ff80adccf88
Parents: bddfe79
Author: Eric E Payne <er...@oath.com>
Authored: Fri May 25 16:06:09 2018 +0000
Committer: Eric E Payne <er...@oath.com>
Committed: Fri May 25 16:06:09 2018 +0000

----------------------------------------------------------------------
 .../resource/DefaultResourceCalculator.java     |  15 ++-
 .../resource/DominantResourceCalculator.java    |  39 ++++---
 .../yarn/util/resource/ResourceCalculator.java  |  13 ++-
 .../hadoop/yarn/util/resource/Resources.java    |   5 -
 .../AbstractPreemptableResourceCalculator.java  |  58 ++++++++---
 .../CapacitySchedulerPreemptionUtils.java       |  61 +++++++++--
 .../capacity/FifoCandidatesSelector.java        |   8 +-
 .../FifoIntraQueuePreemptionPlugin.java         |   4 +-
 .../capacity/IntraQueueCandidatesSelector.java  |   2 +-
 .../capacity/PreemptableResourceCalculator.java |   6 +-
 .../monitor/capacity/TempQueuePerPartition.java |   8 +-
 ...alCapacityPreemptionPolicyMockFramework.java |  30 ++++++
 .../TestPreemptionForQueueWithPriorities.java   | 103 ++++++++++++-------
 ...pacityPreemptionPolicyInterQueueWithDRF.java |  60 ++++++++++-
 14 files changed, 312 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 6375c4a..ab6d7f5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -136,13 +136,18 @@ public class DefaultResourceCalculator extends ResourceCalculator {
   }
 
   @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    return resource.getMemorySize() == 0f;
-  }
-
-  @Override
   public Resource normalizeDown(Resource r, Resource stepFactor) {
     return Resources.createResource(
         roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
   }
+
+  @Override
+  public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
+    return resource.getMemorySize() <= 0;
+  }
+
+  @Override
+  public boolean isAnyMajorResourceAboveZero(Resource resource) {
+    return resource.getMemorySize() > 0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 6fed23b..2e85ebc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -577,19 +577,6 @@ public class DominantResourceCalculator extends ResourceCalculator {
   }
 
   @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
-    for (int i = 0; i < maxLength; i++) {
-      ResourceInformation resourceInformation = resource
-          .getResourceInformation(i);
-      if (resourceInformation.getValue() == 0L) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  @Override
   public Resource normalizeDown(Resource r, Resource stepFactor) {
     Resource ret = Resource.newInstance(r);
     int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
@@ -613,4 +600,30 @@ public class DominantResourceCalculator extends ResourceCalculator {
     }
     return ret;
   }
+
+  @Override
+  public boolean isAnyMajorResourceZeroOrNegative(Resource resource) {
+    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    for (int i = 0; i < maxLength; i++) {
+      ResourceInformation resourceInformation = resource.getResourceInformation(
+          i);
+      if (resourceInformation.getValue() <= 0L) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public boolean isAnyMajorResourceAboveZero(Resource resource) {
+    int maxLength = ResourceUtils.getNumberOfKnownResourceTypes();
+    for (int i = 0; i < maxLength; i++) {
+      ResourceInformation resourceInformation = resource.getResourceInformation(
+          i);
+      if (resourceInformation.getValue() > 0) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index 1c42126..51078cd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -239,12 +239,12 @@ public abstract class ResourceCalculator {
 
   /**
    * Check if resource has any major resource types (which are all NodeManagers
-   * included) a zero value.
+   * included) a zero value or negative value.
    *
    * @param resource resource
    * @return returns true if any resource is zero.
    */
-  public abstract boolean isAnyMajorResourceZero(Resource resource);
+  public abstract boolean isAnyMajorResourceZeroOrNegative(Resource resource);
 
   /**
    * Get resource <code>r</code>and normalize down using step-factor
@@ -257,4 +257,13 @@ public abstract class ResourceCalculator {
    * @return resulting normalized resource
    */
   public abstract Resource normalizeDown(Resource r, Resource stepFactor);
+
+  /**
+   * Check if resource has any major resource types (which are all NodeManagers
+   * included) has a >0 value.
+   *
+   * @param resource resource
+   * @return returns true if any resource is >0
+   */
+  public abstract boolean isAnyMajorResourceAboveZero(Resource resource);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 1c08844..7826f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -547,11 +547,6 @@ public class Resources {
     return ret;
   }
 
-  public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
-      Resource resource) {
-    return rc.isAnyMajorResourceZero(resource);
-  }
-
   public static Resource normalizeDown(ResourceCalculator calculator,
       Resource resource, Resource factor) {
     return calculator.normalizeDown(resource, factor);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 2589970..64b3615 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.PriorityQueue;
-
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy;
@@ -32,6 +26,12 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
 /**
  * Calculate how much resources need to be preempted for each queue,
  * will be used by {@link PreemptionCandidatesSelector}.
@@ -40,7 +40,8 @@ public class AbstractPreemptableResourceCalculator {
 
   protected final CapacitySchedulerPreemptionContext context;
   protected final ResourceCalculator rc;
-  private boolean isReservedPreemptionCandidatesSelector;
+  protected boolean isReservedPreemptionCandidatesSelector;
+  private Resource stepFactor;
 
   static class TQComparator implements Comparator<TempQueuePerPartition> {
     private ResourceCalculator rc;
@@ -90,6 +91,11 @@ public class AbstractPreemptableResourceCalculator {
     rc = preemptionContext.getResourceCalculator();
     this.isReservedPreemptionCandidatesSelector =
         isReservedPreemptionCandidatesSelector;
+
+    stepFactor = Resource.newInstance(0, 0);
+    for (ResourceInformation ri : stepFactor.getResources()) {
+      ri.setValue(1);
+    }
   }
 
   /**
@@ -122,23 +128,24 @@ public class AbstractPreemptableResourceCalculator {
     TQComparator tqComparator = new TQComparator(rc, totGuarant);
     PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
         tqComparator);
-    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext(); ) {
       TempQueuePerPartition q = i.next();
       Resource used = q.getUsed();
 
       Resource initIdealAssigned;
       if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
-        initIdealAssigned =
-            Resources.add(q.getGuaranteed(), q.untouchableExtra);
-      } else {
+        initIdealAssigned = Resources.add(
+            Resources.componentwiseMin(q.getGuaranteed(), q.getUsed()),
+            q.untouchableExtra);
+      } else{
         initIdealAssigned = Resources.clone(used);
       }
 
       // perform initial assignment
       initIdealAssignment(totGuarant, q, initIdealAssigned);
 
-
       Resources.subtractFrom(unassigned, q.idealAssigned);
+
       // If idealAssigned < (allocated + used + pending), q needs more
       // resources, so
       // add it to the list of underserved queues, ordered by need.
@@ -152,7 +159,6 @@ public class AbstractPreemptableResourceCalculator {
     // left
     while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
         unassigned, Resources.none())) {
-      Resource wQassigned = Resource.newInstance(0, 0);
       // we compute normalizedGuarantees capacity based on currently active
       // queues
       resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
@@ -166,11 +172,26 @@ public class AbstractPreemptableResourceCalculator {
       Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
           orderedByNeed, tqComparator);
 
+      // This value will be used in every round to calculate ideal allocation.
+      // So make a copy to avoid it changed during calculation.
+      Resource dupUnassignedForTheRound = Resources.clone(unassigned);
+
       for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
           .hasNext();) {
+        if (!rc.isAnyMajorResourceAboveZero(unassigned)) {
+          break;
+        }
+
         TempQueuePerPartition sub = i.next();
-        Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
-            sub.normalizedGuarantee, Resource.newInstance(1, 1));
+
+        // How much resource we offer to the queue (to increase its ideal_alloc
+        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
+            dupUnassignedForTheRound,
+            sub.normalizedGuarantee, this.stepFactor);
+
+        // Make sure it is not beyond unassigned
+        wQavail = Resources.componentwiseMin(wQavail, unassigned);
+
         Resource wQidle = sub.offer(wQavail, rc, totGuarant,
             isReservedPreemptionCandidatesSelector);
         Resource wQdone = Resources.subtract(wQavail, wQidle);
@@ -180,9 +201,12 @@ public class AbstractPreemptableResourceCalculator {
           // queue, recalculating its order based on need.
           orderedByNeed.add(sub);
         }
-        Resources.addTo(wQassigned, wQdone);
+
+        Resources.subtractFrom(unassigned, wQdone);
+
+        // Make sure unassigned is always larger than 0
+        unassigned = Resources.componentwiseMax(unassigned, Resources.none());
       }
-      Resources.subtractFrom(unassigned, wQassigned);
     }
 
     // Sometimes its possible that, all queues are properly served. So intra

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index f097e9c..5396d61 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -132,6 +133,16 @@ public class CapacitySchedulerPreemptionUtils {
    *          map to hold preempted containers
    * @param totalPreemptionAllowed
    *          total preemption allowed per round
+   * @param conservativeDRF
+   *          should we do conservativeDRF preemption or not.
+   *          When true:
+   *            stop preempt container when any major resource type <= 0 for to-
+   *            preempt.
+   *            This is default preemption behavior of intra-queue preemption
+   *          When false:
+   *            stop preempt container when: all major resource type <= 0 for
+   *            to-preempt.
+   *            This is default preemption behavior of inter-queue preemption
    * @return should we preempt rmContainer. If we should, deduct from
    *         <code>resourceToObtainByPartition</code>
    */
@@ -140,7 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
       Map<String, Resource> resourceToObtainByPartitions,
       RMContainer rmContainer, Resource clusterResource,
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      Resource totalPreemptionAllowed) {
+      Resource totalPreemptionAllowed, boolean conservativeDRF) {
     ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
 
     // We will not account resource of a container twice or more
@@ -152,13 +163,49 @@ public class CapacitySchedulerPreemptionUtils {
         rmContainer.getAllocatedNode());
     Resource toObtainByPartition = resourceToObtainByPartitions
         .get(nodePartition);
+    if (null == toObtainByPartition) {
+      return false;
+    }
+
+    // If a toObtain resource type == 0, set it to -1 to avoid 0 resource
+    // type affect following doPreemption check: isAnyMajorResourceZero
+    for (ResourceInformation ri : toObtainByPartition.getResources()) {
+      if (ri.getValue() == 0) {
+        ri.setValue(-1);
+      }
+    }
+
+    if (rc.isAnyMajorResourceAboveZero(toObtainByPartition) && Resources.fitsIn(
+        rc, rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
+      boolean doPreempt;
+
+      // How much resource left after preemption happen.
+      Resource toObtainAfterPreemption = Resources.subtract(toObtainByPartition,
+          rmContainer.getAllocatedResource());
+
+      if (conservativeDRF) {
+        doPreempt = !rc.isAnyMajorResourceZeroOrNegative(toObtainByPartition);
+      } else {
+        // When we want to do more aggressive preemption, we will do preemption
+        // only if:
+        // - The preempt of the container makes positive contribution to the
+        //   to-obtain resource. Positive contribution means any positive
+        //   resource type decreases.
+        //
+        //   This is example of positive contribution:
+        //     * before: <30, 10, 5>, after <20, 10, -10>
+        //   But this not positive contribution:
+        //     * before: <30, 10, 0>, after <30, 10, -15>
+        doPreempt = Resources.lessThan(rc, clusterResource,
+            Resources
+                .componentwiseMax(toObtainAfterPreemption, Resources.none()),
+            Resources.componentwiseMax(toObtainByPartition, Resources.none()));
+      }
+
+      if (!doPreempt) {
+        return false;
+      }
 
-    if (null != toObtainByPartition
-        && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
-            Resources.none())
-        && Resources.fitsIn(rc, rmContainer.getAllocatedResource(),
-            totalPreemptionAllowed)
-        && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
       Resources.subtractFrom(toObtainByPartition,
           rmContainer.getAllocatedResource());
       Resources.subtractFrom(totalPreemptionAllowed,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 748548a..3b2fcbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -111,7 +111,7 @@ public class FifoCandidatesSelector
                   .tryPreemptContainerAndDeductResToObtain(rc,
                       preemptionContext, resToObtainByPartition, c,
                       clusterResource, selectedCandidates,
-                      totalPreemptionAllowed);
+                      totalPreemptionAllowed, false);
               if (!preempted) {
                 continue;
               }
@@ -187,7 +187,7 @@ public class FifoCandidatesSelector
       boolean preempted = CapacitySchedulerPreemptionUtils
           .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
               resToObtainByPartition, c, clusterResource, preemptMap,
-              totalPreemptionAllowed);
+              totalPreemptionAllowed, false);
       if (preempted) {
         Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
       }
@@ -221,7 +221,7 @@ public class FifoCandidatesSelector
       // Try to preempt this container
       CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
           rc, preemptionContext, resToObtainByPartition, c, clusterResource,
-          selectedContainers, totalPreemptionAllowed);
+          selectedContainers, totalPreemptionAllowed, false);
 
       if (!preemptionContext.isObserveOnly()) {
         preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -264,7 +264,7 @@ public class FifoCandidatesSelector
       // Try to preempt this container
       CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
           rc, preemptionContext, resToObtainByPartition, c, clusterResource,
-          selectedContainers, totalPreemptionAllowed);
+          selectedContainers, totalPreemptionAllowed, false);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 1776bd4..40f333f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -278,8 +278,8 @@ public class FifoIntraQueuePreemptionPlugin
 
       // Once unallocated resource is 0, we can stop assigning ideal per app.
       if (Resources.lessThanOrEqual(rc, clusterResource,
-          queueReassignableResource, Resources.none())
-          || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
+          queueReassignableResource, Resources.none()) || rc
+          .isAnyMajorResourceZeroOrNegative(queueReassignableResource)) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index 5b6932e..a91fac7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -230,7 +230,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
       boolean ret = CapacitySchedulerPreemptionUtils
           .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
               resToObtainByPartition, c, clusterResource, selectedCandidates,
-              totalPreemptedResourceAllowed);
+              totalPreemptedResourceAllowed, true);
 
       // Subtract from respective user's resource usage once a container is
       // selected for preemption.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 676c14f..08d834e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -41,8 +41,6 @@ public class PreemptableResourceCalculator
   private static final Log LOG =
       LogFactory.getLog(PreemptableResourceCalculator.class);
 
-  private boolean isReservedPreemptionCandidatesSelector;
-
   /**
    * PreemptableResourceCalculator constructor
    *
@@ -95,8 +93,8 @@ public class PreemptableResourceCalculator
     }
 
     // first compute the allocation as a fixpoint based on guaranteed capacity
-    computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
-        false);
+    computeFixpointAllocation(tot_guarant, new HashSet<>(nonZeroGuarQueues),
+        unassigned, false);
 
     // if any capacity is left unassigned, distributed among zero-guarantee
     // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 9d8297d..4214acc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -151,7 +151,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     //               # This is for leaf queue only.
     //               max(guaranteed, used) - assigned}
     // remain = avail - accepted
-    Resource accepted = Resources.min(rc, clusterResource,
+    Resource accepted = Resources.componentwiseMin(
         absMaxCapIdealAssignedDelta,
         Resources.min(rc, clusterResource, avail, Resources
             /*
@@ -186,6 +186,12 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
 
     accepted = acceptedByLocality(rc, accepted);
 
+    // accept should never be < 0
+    accepted = Resources.componentwiseMax(accepted, Resources.none());
+
+    // or more than offered
+    accepted = Resources.componentwiseMin(accepted, avail);
+
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index a8e2697..a972584 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@@ -59,6 +60,7 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.mockito.ArgumentMatcher;
@@ -104,10 +106,32 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   EventHandler<Event> mDisp = null;
   ProportionalCapacityPreemptionPolicy policy = null;
   Resource clusterResource = null;
+  // Initialize resource map
+  Map<String, ResourceInformation> riMap = new HashMap<>();
+
+  private void resetResourceInformationMap() {
+    // Initialize mandatory resources
+    ResourceInformation memory = ResourceInformation.newInstance(
+        ResourceInformation.MEMORY_MB.getName(),
+        ResourceInformation.MEMORY_MB.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    ResourceInformation vcores = ResourceInformation.newInstance(
+        ResourceInformation.VCORES.getName(),
+        ResourceInformation.VCORES.getUnits(),
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
+    riMap.put(ResourceInformation.MEMORY_URI, memory);
+    riMap.put(ResourceInformation.VCORES_URI, vcores);
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+  }
 
   @SuppressWarnings("unchecked")
   @Before
   public void setup() {
+    resetResourceInformationMap();
+
     org.apache.log4j.Logger.getRootLogger().setLevel(
         org.apache.log4j.Level.DEBUG);
 
@@ -142,6 +166,12 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
     partitionToResource = new HashMap<>();
     nodeIdToSchedulerNodes = new HashMap<>();
     nameToCSQueues = new HashMap<>();
+    clusterResource = Resource.newInstance(0, 0);
+  }
+
+  @After
+  public void cleanup() {
+    resetResourceInformationMap();
   }
 
   public void buildEnv(String labelsConfig, String nodesConfig,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index e9a8116..6a953cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,44 +20,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class TestPreemptionForQueueWithPriorities
     extends ProportionalCapacityPreemptionPolicyMockFramework {
-  // Initialize resource map
-  private Map<String, ResourceInformation> riMap = new HashMap<>();
-
   @Before
   public void setup() {
-
-    // Initialize mandatory resources
-    ResourceInformation memory = ResourceInformation.newInstance(
-        ResourceInformation.MEMORY_MB.getName(),
-        ResourceInformation.MEMORY_MB.getUnits(),
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
-    ResourceInformation vcores = ResourceInformation.newInstance(
-        ResourceInformation.VCORES.getName(),
-        ResourceInformation.VCORES.getUnits(),
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
-    riMap.put(ResourceInformation.MEMORY_URI, memory);
-    riMap.put(ResourceInformation.VCORES_URI, vcores);
-
-    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
-
+    rc = new DefaultResourceCalculator();
     super.setup();
     policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
   }
@@ -340,8 +321,8 @@ public class TestPreemptionForQueueWithPriorities
      *   - a2 (capacity=60), p=1
      * - b (capacity=30), p=1
      *   - b1 (capacity=50), p=1
-     *   - b1 (capacity=50), p=2
-     * - c (capacity=40), p=2
+     *   - b2 (capacity=50), p=2
+     * - c (capacity=40), p=1
      * </pre>
      */
     String labelsConfig = "=100,true"; // default partition
@@ -349,11 +330,11 @@ public class TestPreemptionForQueueWithPriorities
     String queuesConfig =
         // guaranteed,max,used,pending
         "root(=[100 100 100 100]);" + //root
-            "-a(=[30 100 40 50]){priority=1};" + // a
+            "-a(=[29 100 40 50]){priority=1};" + // a
             "--a1(=[12 100 20 50]){priority=1};" + // a1
-            "--a2(=[18 100 20 50]){priority=1};" + // a2
-            "-b(=[30 100 59 50]){priority=1};" + // b
-            "--b1(=[15 100 30 50]){priority=1};" + // b1
+            "--a2(=[17 100 20 50]){priority=1};" + // a2
+            "-b(=[31 100 59 50]){priority=1};" + // b
+            "--b1(=[16 100 30 50]){priority=1};" + // b1
             "--b2(=[15 100 29 50]){priority=2};" + // b2
             "-c(=[40 100 1 30]){priority=1}";   // c
     String appsConfig =
@@ -362,7 +343,7 @@ public class TestPreemptionForQueueWithPriorities
             "a2\t(1,1,n1,,20,false);" + // app2 in a2
             "b1\t(1,1,n1,,30,false);" + // app3 in b1
             "b2\t(1,1,n1,,29,false);" + // app4 in b2
-            "c\t(1,1,n1,,29,false)"; // app5 in c
+            "c\t(1,1,n1,,1,false)"; // app5 in c
 
 
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
@@ -370,16 +351,16 @@ public class TestPreemptionForQueueWithPriorities
 
     // Preemption should first divide capacities between a / b, and b2 should
     // get less preemption than b1 (because b2 has higher priority)
-    verify(mDisp, times(5)).handle(argThat(
+    verify(mDisp, times(6)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
-    verify(mDisp, never()).handle(argThat(
+    verify(mDisp, times(1)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(2))));
-    verify(mDisp, times(15)).handle(argThat(
+    verify(mDisp, times(13)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
-    verify(mDisp, times(9)).handle(argThat(
+    verify(mDisp, times(10)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(4))));
   }
@@ -426,7 +407,7 @@ public class TestPreemptionForQueueWithPriorities
 
     // Preemption should first divide capacities between a / b, and b1 should
     // get less preemption than b2 (because b1 has higher priority)
-    verify(mDisp, never()).handle(argThat(
+    verify(mDisp, times(3)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
     verify(mDisp, never()).handle(argThat(
@@ -505,4 +486,56 @@ public class TestPreemptionForQueueWithPriorities
             getAppAttemptId(3))));
   }
 
+  @Test
+  public void test3ResourceTypesInterQueuePreemption() throws IOException {
+    rc = new DominantResourceCalculator();
+    when(cs.getResourceCalculator()).thenReturn(rc);
+
+    // Initialize resource map
+    String RESOURCE_1 = "res1";
+    riMap.put(RESOURCE_1, ResourceInformation.newInstance(RESOURCE_1, "", 0,
+        ResourceTypes.COUNTABLE, 0, Integer.MAX_VALUE));
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *              root
+     *           /  \  \
+     *          a    b  c
+     * </pre>
+     *  A / B / C have 33.3 / 33.3 / 33.4 resources
+     *  Total cluster resource have mem=30, cpu=18, GPU=6
+     *  A uses mem=6, cpu=3, GPU=3
+     *  B uses mem=6, cpu=3, GPU=3
+     *  C is asking mem=1,cpu=1,GPU=1
+     *
+     *  We expect it can preempt from one of the jobs
+     */
+    String labelsConfig =
+        "=30:18:6,true;";
+    String nodesConfig =
+        "n1= res=30:18:6;"; // n1 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[30:18:6 30:18:6 12:12:6 1:1:1]){priority=1};" + //root
+            "-a(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // a
+            "-b(=[10:6:2 10:6:2 6:6:3 0:0:0]){priority=1};" + // b
+            "-c(=[10:6:2 10:6:2 0:0:0 1:1:1]){priority=2}"; // c
+    String appsConfig=
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a1
+            + "(1,2:2:1,n1,,3,false);" +
+            "b\t" // app2 in b2
+            + "(1,2:2:1,n1,,3,false)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d5509c6/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
index c8a1f0f..14a3a9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyInterQueueWithDRF.java
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
+import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
+import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import java.io.IOException;
+
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
@@ -41,8 +46,7 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
   }
 
   @Test
-  public void testInterQueuePreemptionWithMultipleResource()
-      throws Exception {
+  public void testInterQueuePreemptionWithMultipleResource() throws Exception {
     /**
      * Queue structure is:
      *
@@ -121,4 +125,52 @@ public class TestProportionalCapacityPreemptionPolicyInterQueueWithDRF
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
-}
+
+  @Test
+  public void test3ResourceTypesInterQueuePreemption() throws IOException {
+    // Initialize resource map
+    String RESOURCE_1 = "res1";
+    riMap.put(RESOURCE_1, ResourceInformation
+        .newInstance(RESOURCE_1, "", 0, ResourceTypes.COUNTABLE, 0,
+            Integer.MAX_VALUE));
+
+    ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
+
+    /*
+     *              root
+     *           /  \  \
+     *          a    b  c
+     *
+     *  A / B / C have 33.3 / 33.3 / 33.4 resources
+     *  Total cluster resource have mem=30, cpu=18, GPU=6
+     *  A uses mem=6, cpu=3, GPU=3
+     *  B uses mem=6, cpu=3, GPU=3
+     *  C is asking mem=1,cpu=1,GPU=1
+     *
+     *  We expect it can preempt from one of the jobs
+     */
+    String labelsConfig = "=30:18:6,true;";
+    String nodesConfig = "n1= res=30:18:6;"; // n1 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[30:18:6 30:18:6 12:12:6 1:1:1]);" + //root
+            "-a(=[10:7:2 10:6:3 6:6:3 0:0:0]);" + // a
+            "-b(=[10:6:2 10:6:3 6:6:3 0:0:0]);" + // b
+            "-c(=[10:5:2 10:6:2 0:0:0 1:1:1])"; // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a1
+            + "(1,2:2:1,n1,,3,false);" + "b\t" // app2 in b2
+            + "(1,2:2:1,n1,,3,false)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org