You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2021/12/21 19:31:34 UTC

[hadoop] branch branch-3.3 updated: YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by Andras Gyori (gandras) and Qi Zhu (zhuqi).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ccaba25  YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by  Andras Gyori (gandras) and Qi Zhu (zhuqi).
ccaba25 is described below

commit ccaba2561a7b8aa06f33342d504e38a095932b00
Author: Eric Payne <ep...@apache.org>
AuthorDate: Tue Dec 21 19:05:39 2021 +0000

    YARN-10178: Global Scheduler async thread crash caused by 'Comparison method violates its general contract. Contributed by  Andras Gyori (gandras) and Qi Zhu (zhuqi).
    
    (cherry picked from commit e2d6fd075dff4e6ea290ec638f0a3f6688e76335)
---
 .../PriorityUtilizationQueueOrderingPolicy.java    | 91 +++++++++++++++-------
 1 file changed, 65 insertions(+), 26 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
index d3e2f89..995c2ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java
@@ -28,12 +28,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
     .CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 /**
  * For two queues with the same priority:
@@ -101,19 +100,21 @@ public class PriorityUtilizationQueueOrderingPolicy
   /**
    * Comparator that both looks at priority and utilization
    */
-  private class PriorityQueueComparator implements Comparator<CSQueue> {
+  private class PriorityQueueComparator
+      implements Comparator<PriorityQueueResourcesForSorting> {
 
     @Override
-    public int compare(CSQueue q1, CSQueue q2) {
+    public int compare(PriorityQueueResourcesForSorting q1Sort,
+        PriorityQueueResourcesForSorting q2Sort) {
       String p = partitionToLookAt.get();
 
-      int rc = compareQueueAccessToPartition(q1, q2, p);
+      int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
       if (0 != rc) {
         return rc;
       }
 
-      float q1AbsCapacity = q1.getQueueCapacities().getAbsoluteCapacity(p);
-      float q2AbsCapacity = q2.getQueueCapacities().getAbsoluteCapacity(p);
+      float q1AbsCapacity = q1Sort.absoluteCapacity;
+      float q2AbsCapacity = q2Sort.absoluteCapacity;
 
       //If q1's abs capacity > 0 and q2 is 0, then prioritize q1
       if (Float.compare(q1AbsCapacity, 0f) > 0 && Float.compare(q2AbsCapacity,
@@ -127,28 +128,33 @@ public class PriorityUtilizationQueueOrderingPolicy
           q2AbsCapacity, 0f) == 0) {
         // both q1 has 0 and q2 has 0 capacity, then fall back to using
         // priority, abs used capacity to prioritize
-        float used1 = q1.getQueueCapacities().getAbsoluteUsedCapacity(p);
-        float used2 = q2.getQueueCapacities().getAbsoluteUsedCapacity(p);
+        float used1 = q1Sort.absoluteUsedCapacity;
+        float used2 = q2Sort.absoluteUsedCapacity;
 
-        return compare(q1, q2, used1, used2, p);
+        return compare(q1Sort, q2Sort, used1, used2,
+            q1Sort.queue.getPriority().
+                getPriority(), q2Sort.queue.getPriority().getPriority());
       } else{
         // both q1 has positive abs capacity and q2 has positive abs
         // capacity
-        float used1 = q1.getQueueCapacities().getUsedCapacity(p);
-        float used2 = q2.getQueueCapacities().getUsedCapacity(p);
+        float used1 = q1Sort.usedCapacity;
+        float used2 = q2Sort.usedCapacity;
 
-        return compare(q1, q2, used1, used2, p);
+        return compare(q1Sort, q2Sort, used1, used2,
+            q1Sort.queue.getPriority().getPriority(),
+            q2Sort.queue.getPriority().getPriority());
       }
     }
 
-    private int compare(CSQueue q1, CSQueue q2, float q1Used, float q2Used,
-        String partition) {
+    private int compare(PriorityQueueResourcesForSorting q1Sort,
+        PriorityQueueResourcesForSorting q2Sort, float q1Used,
+                        float q2Used, int q1Prior, int q2Prior) {
 
       int p1 = 0;
       int p2 = 0;
       if (respectPriority) {
-        p1 = q1.getPriority().getPriority();
-        p2 = q2.getPriority().getPriority();
+        p1 = q1Prior;
+        p2 = q2Prior;
       }
 
       int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
@@ -158,16 +164,16 @@ public class PriorityUtilizationQueueOrderingPolicy
       // capacity goes first
       if (0 == rc) {
         Resource minEffRes1 =
-            q1.getQueueResourceQuotas().getConfiguredMinResource(partition);
+            q1Sort.configuredMinResource;
         Resource minEffRes2 =
-            q2.getQueueResourceQuotas().getConfiguredMinResource(partition);
+            q2Sort.configuredMinResource;
         if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
             Resources.none())) {
           return minEffRes2.compareTo(minEffRes1);
         }
 
-        float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(partition);
-        float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(partition);
+        float abs1 = q1Sort.absoluteCapacity;
+        float abs2 = q2Sort.absoluteCapacity;
         return Float.compare(abs2, abs1);
       }
 
@@ -203,6 +209,37 @@ public class PriorityUtilizationQueueOrderingPolicy
     }
   }
 
+  /**
+   * A simple storage class to represent a snapshot of a queue.
+   */
+  public static class PriorityQueueResourcesForSorting {
+    private final float absoluteUsedCapacity;
+    private final float usedCapacity;
+    private final Resource configuredMinResource;
+    private final float absoluteCapacity;
+    private final CSQueue queue;
+
+    PriorityQueueResourcesForSorting(CSQueue queue) {
+      this.queue = queue;
+      this.absoluteUsedCapacity =
+          queue.getQueueCapacities().
+              getAbsoluteUsedCapacity(partitionToLookAt.get());
+      this.usedCapacity =
+          queue.getQueueCapacities().
+              getUsedCapacity(partitionToLookAt.get());
+      this.absoluteCapacity =
+          queue.getQueueCapacities().
+              getAbsoluteCapacity(partitionToLookAt.get());
+      this.configuredMinResource =
+          queue.getQueueResourceQuotas().
+              getConfiguredMinResource(partitionToLookAt.get());
+    }
+
+    public CSQueue getQueue() {
+      return queue;
+    }
+  }
+
   public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
     this.respectPriority = respectPriority;
   }
@@ -214,12 +251,14 @@ public class PriorityUtilizationQueueOrderingPolicy
 
   @Override
   public Iterator<CSQueue> getAssignmentIterator(String partition) {
-    // Since partitionToLookAt is a thread local variable, and every time we
-    // copy and sort queues, so it's safe for multi-threading environment.
+    // partitionToLookAt is a thread local variable, therefore it is safe to mutate it.
     PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
-    List<CSQueue> sortedQueue = new ArrayList<>(queues);
-    Collections.sort(sortedQueue, new PriorityQueueComparator());
-    return sortedQueue.iterator();
+
+    // Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort.
+    // See YARN-10178 for details.
+    return queues.stream().map(PriorityQueueResourcesForSorting::new).sorted(
+        new PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect(
+            Collectors.toList()).iterator();
   }
 
   @Override

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org