You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/12 09:05:45 UTC

[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r726867133



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);

Review comment:
       nit: 2 indents

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.

Review comment:
       nit: keep align with the first line 

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);

Review comment:
       nit: 2 indents

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       `requestingConsumer` is not guaranteed to be included in `consumers` at this point?

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+    try {
+      long released = consumerToSpill.spill(requested, requestingConsumer);
+      if (released > 0) {
+        logger.debug("Task {} released {} of requested {} from {} for {}", taskAttemptId,

Review comment:
       `released` -> `spilled`?

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {

Review comment:
       ```suggestion
     private long trySpillAndAcquire(
         MemoryConsumer requestingConsumer,
         long requested, List<MemoryConsumer> cList,
         int idx) {
   ```

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+    try {
+      long released = consumerToSpill.spill(requested, requestingConsumer);
+      if (released > 0) {
+        logger.debug("Task {} released {} of requested {} from {} for {}", taskAttemptId,
+                Utils.bytesToString(released), Utils.bytesToString(requested), consumerToSpill,
+                requestingConsumer);

Review comment:
       2 indents




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org