You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/10/05 22:59:38 UTC

[GitHub] [spark] timarmstrong opened a new pull request #34186: [SPARK-36933][CORE] clean up TaskMemoryManager.acquireExecutionMemory()

timarmstrong opened a new pull request #34186:
URL: https://github.com/apache/spark/pull/34186


   ### What changes were proposed in this pull request?
   * Factor out a method `trySpillAndAcquire()` from `acquireExecutionMemory()` that handles the details of how to spill a `MemoryConsumer` and acquire the spilled memory. This logic was duplicated twice.
   * Combine the two loops (spill other consumers and self-spill) into a single loop that implements equivalent logic. I made self-spill the lowest priority consumer and this is exactly equivalent.
   * Consolidate comments a little to explain what the policy is trying to achieve and how at a high level
   
   ### Why are the changes needed?
   Reduce code duplication and better separate the policy decision of which MemoryConsumer to spill from the mechanism of requesting it to spill.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Added some unit tests to verify the details of the spilling decisions in some scenarios that are not covered by current unit tests. Ran these on Spark master without the TaskMemoryManager changes to confirm that the behaviour is the same before and after my refactoring.
   
   The SPARK-35486 test also provides some coverage for the retry loop.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
gengliangwang commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936612553


   cc @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936998048






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936960748


   **[Test build #143883 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143883/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r728651379



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       Yeah you are right that it can call `spill()` on itself in that case. I believe that should always be a no-op because `acquireExecutionMemory()` has not returned to the consumer and the consumer could not have allocated any memory.
   
   I think your suggestion makes the logic easier to understand anyway, so let's do it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-942538533


   Thanks for the review and sorry for the formatting issues - are there any tools that can help get the right java formatting automatically?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r728741025



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       >... because `acquireExecutionMemory()` has not returned to the consumer and the consumer could not have allocated any memory.
   
   Do you mean it's always a no-op for the first time? And I don't get this explanation well. Could you elaborate more?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936675903


   **[Test build #143885 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143885/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r726867133



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);

Review comment:
       nit: 2 indents

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.

Review comment:
       nit: keep align with the first line 

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);

Review comment:
       nit: 2 indents

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       `requestingConsumer` is not guaranteed to be included in `consumers` at this point?

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+    try {
+      long released = consumerToSpill.spill(requested, requestingConsumer);
+      if (released > 0) {
+        logger.debug("Task {} released {} of requested {} from {} for {}", taskAttemptId,

Review comment:
       `released` -> `spilled`?

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {

Review comment:
       ```suggestion
     private long trySpillAndAcquire(
         MemoryConsumer requestingConsumer,
         long requested, List<MemoryConsumer> cList,
         int idx) {
   ```

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      consumers.add(requestingConsumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(MemoryConsumer requestingConsumer,
+                                  long requested, List<MemoryConsumer> cList, int idx) {
+    MemoryMode mode = requestingConsumer.getMode();
+    MemoryConsumer consumerToSpill = cList.get(idx);
+    logger.debug("Task {} try to spill {} from {} for {}", taskAttemptId,
+            Utils.bytesToString(requested), consumerToSpill, requestingConsumer);
+    try {
+      long released = consumerToSpill.spill(requested, requestingConsumer);
+      if (released > 0) {
+        logger.debug("Task {} released {} of requested {} from {} for {}", taskAttemptId,
+                Utils.bytesToString(released), Utils.bytesToString(requested), consumerToSpill,
+                requestingConsumer);

Review comment:
       2 indents




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r730878231



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       In the beginning, I noticed there's a behavior difference if we still adding the request consumer at the end - that is we will not do self-spill on the request consumer if this's the first time the consumer calls `acquireExecutionMemory`. However, later, I realized that it's actually a no-op if we do self-spill on the request consumer at the first time call.  Therefore, I agree with you @mridulm . Keeping add the consumer at the end should still be fine.
   
    




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-946260848


   Thank you for the reviews everyone!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] srowen commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
srowen commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936667884


   Jenkins test this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34186: [SPARK-36933][CORE] clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-934994442


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r729953747



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       I agree that both versions are equivalent. My original PR didn't make this change, but it seemed like moving it earlier in the method would make it more obviously correct. See discussion - https://github.com/apache/spark/pull/34186#discussion_r726905638
   
   I don't feel strongly either way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936755118


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48396/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936675903


   **[Test build #143885 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143885/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r728282858



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       You are right - it will not be in there if this is the first time it has called `acquireExecutionMemory()`, but in that case we don't need to consider spilling it. I'll extend the comment to clarify.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936887619


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48396/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r731153656



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       The `c.getUsed() > 0` check also should prevent the requesting consumer being added to the map on the first call.
   
   It sounds like there's a consensus that we add it at the end, like the original code? I'll revert my change then.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r731424378



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       Thanks @timarmstrong 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen closed pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
JoshRosen closed pull request #34186:
URL: https://github.com/apache/spark/pull/34186


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r730191882



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
 
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+          Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //   spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer (if present) a nominal memory usage of 0 so that it is always last in priority
+        // order. The map will include all consumers that have previously acquired memory.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {
-          if (c != consumer && c.getUsed() > 0 && c.getMode() == mode) {
-            long key = c.getUsed();
+          if (c.getUsed() > 0 && c.getMode() == mode) {
+            long key = c == requestingConsumer ? 0 : c.getUsed();
             List<MemoryConsumer> list =
                 sortedConsumers.computeIfAbsent(key, k -> new ArrayList<>(1));
             list.add(c);
           }
         }
-        while (!sortedConsumers.isEmpty()) {
+        // Iteratively spill consumers until we've freed enough memory or run out of consumers.
+        while (got < required && !sortedConsumers.isEmpty()) {
           // Get the consumer using the least memory more than the remaining required memory.
           Map.Entry<Long, List<MemoryConsumer>> currentEntry =
             sortedConsumers.ceilingEntry(required - got);
-          // No consumer has used memory more than the remaining required memory.
-          // Get the consumer of largest used memory.
+          // No consumer has enough memory on its own, start with spilling the biggest consumer.
           if (currentEntry == null) {
             currentEntry = sortedConsumers.lastEntry();
           }
           List<MemoryConsumer> cList = currentEntry.getValue();
-          MemoryConsumer c = cList.get(cList.size() - 1);
-          try {
-            long released = c.spill(required - got, consumer);
-            if (released > 0) {
-              logger.debug("Task {} released {} from {} for {}", taskAttemptId,
-                Utils.bytesToString(released), c, consumer);
-              got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-              if (got >= required) {
-                break;
-              }
-            } else {
-              cList.remove(cList.size() - 1);
-              if (cList.isEmpty()) {
-                sortedConsumers.remove(currentEntry.getKey());
-              }
-            }
-          } catch (ClosedByInterruptException e) {
-            // This called by user to kill a task (e.g: speculative task).
-            logger.error("error while calling spill() on " + c, e);
-            throw new RuntimeException(e.getMessage());
-          } catch (IOException e) {
-            logger.error("error while calling spill() on " + c, e);
-            // checkstyle.off: RegexpSinglelineJava
-            throw new SparkOutOfMemoryError("error while calling spill() on " + c + " : "
-              + e.getMessage());
-            // checkstyle.on: RegexpSinglelineJava
+          got += trySpillAndAcquire(requestingConsumer, required - got, cList, cList.size() - 1);
+          if (cList.isEmpty()) {
+            sortedConsumers.remove(currentEntry.getKey());
           }
         }
       }
 
-      // Attempt to free up memory by self-spilling.
-      //
-      // When our spill handler releases memory, `ExecutionMemoryPool#releaseMemory()` will
-      // immediately notify other tasks that memory has been freed, and they may acquire the
-      // newly-freed memory before we have a chance to do so (SPARK-35486). In that case, we will
-      // try again in the next loop iteration.
-      while (got < required) {
-        try {
-          long released = consumer.spill(required - got, consumer);
-          if (released > 0) {
-            logger.debug("Task {} released {} from itself ({})", taskAttemptId,
-              Utils.bytesToString(released), consumer);
-            got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
-          } else {
-            // Self-spilling could not free up any more memory.
-            break;
-          }
-        } catch (ClosedByInterruptException e) {
-          // This called by user to kill a task (e.g: speculative task).
-          logger.error("error while calling spill() on " + consumer, e);
-          throw new RuntimeException(e.getMessage());
-        } catch (IOException e) {
-          logger.error("error while calling spill() on " + consumer, e);
-          // checkstyle.off: RegexpSinglelineJava
-          throw new SparkOutOfMemoryError("error while calling spill() on " + consumer + " : "
-            + e.getMessage());
-          // checkstyle.on: RegexpSinglelineJava
-        }
-      }
-
-      consumers.add(consumer);
-      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), consumer);
+      logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got),
+              requestingConsumer);
       return got;
     }
   }
 
+  /**
+   * Try to acquire as much memory as possible from `cList[idx]`, up to `requested` bytes by
+   * spilling and then acquiring the freed memory. If no more memory can be spilled from
+   * `cList[idx]`, remove it from the list.
+   *
+   * @return number of bytes acquired (<= requested)
+   * @throws RuntimeException if task is interrupted
+   * @throws SparkOutOfMemoryError if an IOException occurs during spilling
+   */
+  private long trySpillAndAcquire(
+      MemoryConsumer requestingConsumer,
+      long requested, List<MemoryConsumer> cList,

Review comment:
       ```suggestion
         long requested,
         List<MemoryConsumer> cList,
   ```

##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       If I understand correctly, we're debating whether to add to the list of memory consumers either at the start of the method (here) or at the very end (right before returning `got`).
   
   As @timarmstrong's notes at https://github.com/apache/spark/pull/34186#discussion_r729523404, this only makes a difference when we have never previously called `acquireExecutionMemory` for this `requestingConsumer`.
   
   Adding the consumer at the beginning of the method means that we might try to self-spill a `requestingConsumer` which has never requested memory before and therefore is using no memory. I took at look at the existing `MemoryConsumer.spill()` implementations in Spark to see if there's any cases where this might be a problem:
   
   - RowBasedKeyValueBatch and HashedRelation's spill methods are always no-ops.
   - `RowQueue`, `Spillable` and `BytesToBytesMap` spill is always a no-op for self spills.
   -ShuffleExternalSorter and UnsafeExternalSorter's spill methods are no-ops if those consumers haven't acquired any memory.
   - `TestMemoryConsumer` and its subclasses look like they'll be fine. There, we have: https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java#L34-L39
      If it hasn't acquired any memory, we'll call `free(0)`, which will result in a series of calls that eventually flow to https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L152 where it looks like things will work correctly.
   
   Given all of that, I think having the call earlier in the method won't cause problems for the existing consumers defined in Spark. I suppose it could potentially impact custom consumers defined outside of Spark, though. 
   
   If we're trying to be as cautious and faithful to the old behavior as possible then maybe it's simpler to put it back at the end. I don't feel super strongly about this, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-946194930


   This looks good to me (including the latest edit to place the memory consumer registration back at its original location at the end of the method), so I'm going to merge this to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-942538533


   Thanks for the review and sorry for the formatting issues - are there any tools that can help get the right java formatting automatically?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r728555103



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       > but in that case we don't need to consider spilling it.
   
   Why we don't? I think we always (even for the first time) do self-spilling originally when there's insufficient free memory:
   https://github.com/apache/spark/blob/5ac76d9cb45d58eeb4253d50e90060a68c3e87cb/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java#L205-L211
   
   
   (If my understanding is correct, I think we could do `consumers.add(consumer)` earlier to resolve the problem.
   
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936668878


   **[Test build #143883 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143883/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936998080






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936887619


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/48396/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-934994442


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-942853057


   I don't know if Spark has such auto tool but has `dev/lint-java` to check fmt. But, IIRC, things like indentation are not counted. And Spark follows  the style of [databricks/scala-style-guide](https://github.com/databricks/scala-style-guide#spacing-and-indentation). You could check it there.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] timarmstrong commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
timarmstrong commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r728282858



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -149,95 +149,100 @@ public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
       // Try to release memory from other consumers first, then we can reduce the frequency of
       // spilling, avoid to have too many spilled files.
       if (got < required) {
-        // Call spill() on other consumers to release memory
-        // Sort the consumers according their memory usage. So we avoid spilling the same consumer
-        // which is just spilled in last few times and re-spilling on it will produce many small
-        // spill files.
+        logger.debug("Task {} need to spill {} for {}", taskAttemptId,
+                Utils.bytesToString(required - got), requestingConsumer);
+        // We need to call spill() on consumers to free up more memory. We want to optimize for two
+        // things:
+        // * Minimize the number of spill calls, to reduce the number of spill files and avoid small
+        //  spill files.
+        // * Avoid spilling more data than necessary - if we only need a little more memory, we may
+        //   not want to spill as much data as possible. Many consumers spill more than the
+        //   requested amount, so we can take that into account in our decisions.
+        // We use a heuristic that selects the smallest memory consumer with at least `required`
+        // bytes of memory in an attempt to balance these factors. It may work well if there are
+        // fewer larger requests, but can result in many small spills if there are many smaller
+        // requests.
+
+        // Build a map of consumer in order of memory usage to prioritize spilling. Assign current
+        // consumer a nominal memory usage of 0 so that it is always last in priority order.
         TreeMap<Long, List<MemoryConsumer>> sortedConsumers = new TreeMap<>();
         for (MemoryConsumer c: consumers) {

Review comment:
       You are right - it will not be in there if this is the first time it has called `acquireExecutionMemory()`, but in that case we don't need to consider spilling it. I'll extend the comment to clarify.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936833079


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/48396/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936954063


   **[Test build #143885 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143885/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r729966969



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       @Ngone51 Is there anything I am missing above ? Do we necessarily need to proactively add it to `consumers` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a change in pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on a change in pull request #34186:
URL: https://github.com/apache/spark/pull/34186#discussion_r730198470



##########
File path: core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
##########
@@ -135,109 +135,117 @@ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
    *
    * @return number of bytes successfully granted (<= N).
    */
-  public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+  public long acquireExecutionMemory(long required, MemoryConsumer requestingConsumer) {
     assert(required >= 0);
-    assert(consumer != null);
-    MemoryMode mode = consumer.getMode();
+    assert(requestingConsumer != null);
+    MemoryMode mode = requestingConsumer.getMode();
     // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
     // memory here, then it may not make sense to spill since that would only end up freeing
     // off-heap memory. This is subject to change, though, so it may be risky to make this
     // optimization now in case we forget to undo it late when making changes.
     synchronized (this) {
+      consumers.add(requestingConsumer);
       long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);

Review comment:
       If I understand correctly, we're debating whether to add to the list of memory consumers either at the start of the method (here) or at the very end (right before returning `got`).
   
   As @timarmstrong's notes at https://github.com/apache/spark/pull/34186#discussion_r729523404, this only makes a difference when we have never previously called `acquireExecutionMemory` for this `requestingConsumer`.
   
   Adding the consumer at the beginning of the method means that we might try to self-spill a `requestingConsumer` which has never requested memory before and therefore is using no memory. I took at look at the existing `MemoryConsumer.spill()` implementations in Spark to see if there's any cases where this might be a problem:
   
   - RowBasedKeyValueBatch and HashedRelation's spill methods are always no-ops.
   - `RowQueue`, `Spillable` and `BytesToBytesMap` spill is always a no-op for self spills.
   - ShuffleExternalSorter and UnsafeExternalSorter's spill methods are no-ops if those consumers haven't acquired any memory.
   - `TestMemoryConsumer` and its subclasses look like they'll be fine. There, we have: https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java#L34-L39
      If it hasn't acquired any memory, we'll call `free(0)`, which will result in a series of calls that eventually flow to https://github.com/apache/spark/blob/722ac1b8b7f86fdeedf20cc11c7f547e7038029c/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala#L152 where it looks like things will work correctly.
   
   Given all of that, I think having the call earlier in the method won't cause problems for the existing consumers defined in Spark. I suppose it could potentially impact custom consumers defined outside of Spark, though. 
   
   If we're trying to be as cautious and faithful to the old behavior as possible then maybe it's simpler to put it back at the end. I don't feel super strongly about this, though.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #34186: [SPARK-36933][CORE] Clean up TaskMemoryManager.acquireExecutionMemory()

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #34186:
URL: https://github.com/apache/spark/pull/34186#issuecomment-936668878


   **[Test build #143883 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/143883/testReport)** for PR 34186 at commit [`e694349`](https://github.com/apache/spark/commit/e694349dc942115c6d873a8f2607596cba31f419).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org