You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/26 03:52:17 UTC

[GitHub] [spark] sandeepvinayak opened a new pull request, #36680: SPARK-39283: Fixing the deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

sandeepvinayak opened a new pull request, #36680:
URL: https://github.com/apache/spark/pull/36680

   We are facing the deadlock issue b/w TaskMemoryManager and UnsafeExternalSorter.SpillableIterator during the join. It turns out that in UnsafeExternalSorter.SpillableIterator#spill() function, it tries to get lock on UnsafeExternalSorter`SpillableIterator` and UnsafeExternalSorter and call `freePage` to free all allocated pages except the last one which takes the lock on TaskMemoryManage.  
   At the same time, there can be another `MemoryConsumer` using `UnsafeExternalSorter` as part of sorting can try to allocatePage needs to get lock on `TaskMemoryManager` which can cause spill to happen which requires lock on `UnsafeExternalSorter` again causing deadlock.
   
   There is a similar fix here as well:
   https://issues.apache.org/jira/browse/SPARK-27338
   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1139591388

   @sandeepvinayak can you re-trigger the github action jobs?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on code in PR #36680:
URL: https://github.com/apache/spark/pull/36680#discussion_r883157576


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:
##########
@@ -578,58 +580,70 @@ public long getCurrentPageNumber() {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      List<MemoryBlock> pagesToFree = Lists.newArrayList();
+      try {
+        synchronized (this) {
+          if (inMemSorter == null) {
+            return 0L;
+          }
 
-        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
-        if (numRecords > 0) {
-          // Iterate over the records that have not been returned and spill them.
-          final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(
-                  blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
-          spillIterator(upstream, spillWriter);
-          spillWriters.add(spillWriter);
-          upstream = spillWriter.getReader(serializerManager);
-        } else {
-          // Nothing to spill as all records have been read already, but do not return yet, as the
-          // memory still has to be freed.
-          upstream = null;
-        }
+          long currentPageNumber = upstream.getCurrentPageNumber();
+
+          ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+          if (numRecords > 0) {
+            // Iterate over the records that have not been returned and spill them.
+            final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(
+                    blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
+            spillIterator(upstream, spillWriter);
+            spillWriters.add(spillWriter);
+            upstream = spillWriter.getReader(serializerManager);
+          } else {
+            // Nothing to spill as all records have been read already, but do not return yet, as the
+            // memory still has to be freed.
+            upstream = null;
+          }
 
-        long released = 0L;
-        synchronized (UnsafeExternalSorter.this) {
-          // release the pages except the one that is used. There can still be a caller that
-          // is accessing the current record. We free this page in that caller's next loadNext()
-          // call.
-          for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.pageNumber != currentPageNumber) {
-              released += page.size();
-              freePage(page);
-            } else {
-              lastPage = page;
+          long released = 0L;
+          synchronized (UnsafeExternalSorter.this) {
+            // release the pages except the one that is used. There can still be a caller that
+            // is accessing the current record. We free this page in that caller's next loadNext()
+            // call.
+            for (MemoryBlock page : allocatedPages) {
+              if (!loaded || page.pageNumber != currentPageNumber) {
+                released += page.size();
+                pagesToFree.add(page);
+              } else {
+                lastPage = page;
+              }
+            }
+            allocatedPages.clear();
+            if (lastPage != null) {
+              // Add the last page back to the list of allocated pages to make sure it gets freed in
+              // case loadNext() never gets called again.
+              allocatedPages.add(lastPage);
             }
           }
-          allocatedPages.clear();
-          if (lastPage != null) {
-            // Add the last page back to the list of allocated pages to make sure it gets freed in
-            // case loadNext() never gets called again.
-            allocatedPages.add(lastPage);
-          }
-        }
 
-        // in-memory sorter will not be used after spilling
-        assert(inMemSorter != null);
-        released += inMemSorter.getMemoryUsage();
-        totalSortTimeNanos += inMemSorter.getSortTimeNanos();
-        inMemSorter.freeMemory();
-        inMemSorter = null;
-        taskContext.taskMetrics().incMemoryBytesSpilled(released);
-        taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
-        totalSpillBytes += released;
-        return released;
+          // in-memory sorter will not be used after spilling
+          assert (inMemSorter != null);
+          released += inMemSorter.getMemoryUsage();
+          totalSortTimeNanos += inMemSorter.getSortTimeNanos();
+          inMemSorter.freeMemory();
+          inMemSorter = null;
+          taskContext.taskMetrics().incMemoryBytesSpilled(released);
+          taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
+          totalSpillBytes += released;
+          return released;
+        }
+      } finally {
+        // Do not free the page, while we are locking `SpillableIterator`. The `freePage`
+        // method locks the `TaskMemoryManager`, and it's not a good idea to lock 2 objects in
+        // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
+        // `SpillableIterator` in sequence, which may happen in
+        // `TaskMemoryManager.acquireExecutionMemory`.

Review Comment:
   Minor nit: for consistency with the `loadNext()` method (where this comment was copied from), do you mind moving this comment up so that it's located right before the `pagesToFree.add(page)` call on line 614? As worded, I think the comment makes more sense at that location.



##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:
##########
@@ -578,58 +580,70 @@ public long getCurrentPageNumber() {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      List<MemoryBlock> pagesToFree = Lists.newArrayList();

Review Comment:
   Minor nit: Guava says that `Lists.newArrayList` should be treated as deprecated in Java 7+: https://guava.dev/releases/21.0/api/docs/com/google/common/collect/Lists.html#newArrayList--
   
   To avoid us having to update this code in the future, could you change this to 
   
   ```
   ArrayList<MemoryBlock> pagesToFree = new ArrayList<>();
   ```
   
   and update the imports?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140912502

   @JoshRosen can you do the final signoff?
   
   cc @MaxGekk since RC3 failed, can we wait for this deadlock fix to be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140131117

   Thinking about this even more, I'm not sure this fixes 100% of the possible deadlocks here (although it is an improvement over the status quo):
   
   `UnsafeExternalSorter.SpillableIterator.spill()` is called from a single place, `UnsafeExternalSorter.spill(long size, MemoryConsumer trigger)`. That `UnsafeExternalSorter` spill method is called in a few different places:
   
   - (1) `TaskMemoryManager.trySpillAndAcquire`, which is called only from `TaskMemoryManager.acquireExecutionMemory`. The `acquireExecutionMemory` method synchronizes on `TaskMemoryManager.this`. As a result, any calls of `SpillableIterator.spill()` which originate from this code path will already be holding the `TaskMemoryManager` lock, so in that scenario the changes in this PR wouldn't impact the set of locks that are simultaneously held (since the lock acquisitions that we removed would have been acquisitions for locks that were already held).
   - (2) It's also called from `UnsafeExternalSorter.createWithExistingInMemorySorter`. The TMM lock isn't held while calling this method, so this patch's changes would actually change the locking.
   - (3) It's also called from the zero-argument `spill()`, which is syntactic sugar for a self-spill. That method is called in several places, most (or all?) of which won't be holding the TMM lock.
   
   In cases (2) and (3) I think the changes here would help to avoid deadlocks. I don't think it would fix deadlocks that arise from situation (1) unless those deadlocks were occurring between a thread in scenario (1) and a different thread in scenarios (2) or (3).
   
   ---
   
   As an example, here's a pair of stacktraces from a similar deadlock that I observed in the wild:
   
   
   ```
   Thead 406 stdout writer for /databricks/python/bin/python BLOCKED
   Blocked by Thread 445 Lock(org.apache.spark.memory.TaskMemoryManager@686434476})
   Holding Monitor(org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1542245801})
   
   org.apache.spark.memory.TaskMemoryManager.freePage(TaskMemoryManager.java:346)
   org.apache.spark.memory.MemoryConsumer.freePage(MemoryConsumer.java:136)
   org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.freeMemory(UnsafeExternalSorter.java:333)
   org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.cleanupResources(UnsafeExternalSorter.java:361) => holding Monitor(org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1542245801})
   org.apache.spark.sql.execution.UnsafeExternalRowSorter.cleanupResources(UnsafeExternalRowSorter.java:182)
   org.apache.spark.sql.execution.UnsafeExternalRowSorter$1.advanceNext(UnsafeExternalRowSorter.java:213)
   org.apache.spark.sql.execution.RowIteratorToScala.hasNext(RowIterator.scala:68)
   org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:147)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.smj_findNextInnerJoinRows_0$(Unknown Source)
   org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage7.processNext(Unknown Source)
   org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
   org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:776)
   scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
   org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
   scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
   org.apache.spark.sql.execution.python.BatchIterator$$anon$1.hasNext(ArrowEvalPythonExec.scala:47)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:109)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread$$Lambda$6693/1657363063.apply$mcV$sp(Unknown Source)
   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1646)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
   org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:520)
   org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$6583/922362037.apply(Unknown Source)
   org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2230)
   org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:312)
   ```
   
   and
   
   ```
   Thead 455 stdout writer for /databricks/python/bin/python BLOCKED
   Blocked by Thread 406 Lock(org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@1542245801})
   Holding Monitor(org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator@1022993226}), Monitor(org.apache.spark.memory.TaskMemoryManager@686434476})
   
   org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator.spill(UnsafeExternalSorter.java:636) => holding Monitor(org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter$SpillableIterator@1022993226})
   org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:206)
   org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:177) => holding Monitor(org.apache.spark.memory.TaskMemoryManager@686434476})
   org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:297)
   org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:123)
   org.apache.spark.sql.execution.python.HybridRowQueue.createNewQueue(RowQueue.scala:227)
   org.apache.spark.sql.execution.python.HybridRowQueue.add(RowQueue.scala:250)
   org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$10(EvalPythonExec.scala:125)
   org.apache.spark.sql.execution.python.EvalPythonExec$$Lambda$6418/1950569834.apply(Unknown Source)
   scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
   org.apache.spark.sql.execution.python.BatchIterator$$anon$1.next(ArrowEvalPythonExec.scala:54)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.$anonfun$writeIteratorToStream$1(ArrowPythonRunner.scala:110)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread$$Lambda$6693/1657363063.apply$mcV$sp(Unknown Source)
   scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1646)
   org.apache.spark.sql.execution.python.ArrowPythonRunner$ArrowWriterThread.writeIteratorToStream(ArrowPythonRunner.scala:132)
   org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:520)
   org.apache.spark.api.python.BasePythonRunner$WriterThread$$Lambda$6583/922362037.apply(Unknown Source)
   org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2230)
   org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:312)
   ```
   
   In this case, the deadlock is occurring between a `cleanupResources()` call and a `spill()` call. The spill call locks the TMM then locks the sorter, whereas the cleanup resources call first locks the sorter and then calls methods that lock the TMM (resulting in a circular wait). I don't think that specific deadlock would be fixed by this patch's changes.
   
   @sandeepvinayak, do you happen to have full stacktraces from the deadlock you reported in https://issues.apache.org/jira/browse/SPARK-39283 (beyond just the information shown in the screenshot)? I'm curious to know more about the specific deadlock you saw.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #36680: SPARK-39283: Fixing the deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1138161910

   +CC @xkrogen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140132804

   @JoshRosen Unfortunately, we don't have the server logs at this point, can definitely try to look in another occurrence of deadlock. I will also try to take another look based on your comments and revise the PR. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen closed pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen closed pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator
URL: https://github.com/apache/spark/pull/36680


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140128160

   Good catch @JoshRosen , I believe we can do it without having a local `inMemSorterToFree` by moving the `inMemSorter.freeMemory` to finally. WDYT ?
   
   ```java
         finally {
           for (MemoryBlock pageToFree : pagesToFree) {
             freePage(pageToFree);
           }
           if (inMemSorter != null) {
             inMemSorter.freeMemory();
             inMemSorter = null;
           }
         }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140203155

   @JoshRosen Just took another look at the code, the fix I made is for the deadlock b/w `TaskMemoryManager` and `UnsafeExternalSorter.SplittableIterator` which is what we faced and that's the reason I focused on the locks on `SplittableIterator`. But I understand that there might be other gaps and I feel it's a right thing to fix those as part of this PR. I will update my PR for `cleanupResources`, thanks for your review again!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140134040

   There's nothing that we can do to change the lock acquisition order from a different task thread calling `spill()`: in that case the spill-initiating thread will always first acquire the TMM lock, then the spill iterator lock, then the sorter lock.
   
   As a result, if we want to completely prevent deadlocks we need to ensure that no other method acquires both locks in the opposite order. Let's look through all of the uses of `synchronized` in this file and check them case-by-case:
   
   1. `loadNext()` is okay because it doesn't allocate or free memory while holding the SpillableIterator lock.
   2. `cleanupResources()` is **not** okay because it locks the sorter then the TMM.
   3. before this PR, self-spill()s were not okay, but after your changes it's okay.
   
   Those are the only three methods with synchronization. If we can fix the `cleanupResources()` case then I think we'll be completely free of deadlocks.
   
   Here's some code from a work-in-progress patch that I started developing before your PR was opened:
   
   ```diff
   diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
   index 1af032d1676..16860b6c9a9 100644
   --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
   +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
   @@ -357,16 +357,22 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       * @return the number of bytes freed.
       */
      private long freeMemory() {
   -    updatePeakMemoryUsed();
   +    LinkedList<MemoryBlock> pagesToFree = clearAllocatedPagesAndReturnPagesToFree();
        long memoryFreed = 0;
   -    for (MemoryBlock block : allocatedPages) {
   +    for (MemoryBlock block : pagesToFree) {
          memoryFreed += block.size();
          freePage(block);
        }
   +    return memoryFreed;
   +  }
   +
   +  private LinkedList<MemoryBlock> clearAllocatedPagesAndReturnPagesToFree() {
   +    updatePeakMemoryUsed();
   +    LinkedList<MemoryBlock> pagesToFree = new LinkedList<>(allocatedPages);
        allocatedPages.clear();
        currentPage = null;
        pageCursor = 0;
   -    return memoryFreed;
   +    return pagesToFree;
      }
    
      /**
   @@ -387,14 +393,26 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
       * Frees this sorter's in-memory data structures and cleans up its spill files.
       */
      public void cleanupResources() {
   +    // To avoid deadlocks, we can't call methods that lock the TaskMemoryManager
   +    // (such as various free() methods) while synchronizing on the UnsafeExternalSorter.
   +    // Instead, we will manipulate UnsafeExternalSorter state inside the synchronized
   +    // block and perform the actual free() calls outside it.
   +    UnsafeInMemorySorter inMemSorterToFree = null;
   +    LinkedList<MemoryBlock> pagesToFree;
        synchronized (this) {
          deleteSpillFiles();
   -      freeMemory();
   +      pagesToFree = clearAllocatedPagesAndReturnPagesToFree();
          if (inMemSorter != null) {
   -        inMemSorter.freeMemory();
   +        inMemSorterToFree = inMemSorter;
            inMemSorter = null;
          }
        }
   +    for (MemoryBlock block : pagesToFree) {
   +      freePage(block);
   +    }
   +    if (inMemSorterToFree != null) {
   +      inMemSorterToFree.freeMemory();
   +    }
      }
    
      /**
   ```
   
   Here the idea is to apply a similar fix in `cleanupResources()`. I think we could incorporate these changes here. I forgot to add the `try-finally`, though, so we should add that in `cleanupResources()` when incorporating these changes.
   
   I had one concern about a potential corner-case downside to my `cleanupResources()` change:
   
   > There's one potential downside to this approach: before, if a spill() call occurred while the cleanupResources() call was already in progress then the spill would block until that call completed and would be able to receive all of the memory freed by cleanupResources(). After this patch's changes, the spill() would no longer block so it might end up causing another operator to spill even though a chunk of memory was just about to be freed.
   
   I don't think we should worry about that, though: any scenario would be race-condition-prone and thus prone to deadlocks, so I think we should prioritize fixing the deadlock over my other vague concern.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1139146525

   > deadlock occurred between one thread calling `UnsafeExternalSorter.cleanupResources()` and another thread which was calling `spill()` on UnsafeExternalSorter
   
   I believe that is where the issue is, this was the only synchronized block I found in  `UnsafeExternalSorter` which could endup in nested locks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #36680: SPARK-39283: Fixing the deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1139135227

   I saw similar deadlocks in a user workload, where the deadlock occurred between one thread calling `UnsafeExternalSorter.cleanupResources()` and another thread which was calling `spill()` on `UnsafeExternalSorter`.
   
   I originally considered an approach which is somewhat similar to this, except I made the changes in `cleanupResources()`. I ended up searching for other approaches after I became concerned about whether my proposed change would result in other race conditions. I began exploring whether we can address this entire class of issues more fundamentally via changes to `TaskMemoryManager` itself, but I'm not 100% confident in the safety of those changes. I'll try to open a PR for discussion of that other proposal.
   
   In the meantime, let me take a closer look at this PR's fix. This looks potentially promising.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on a diff in pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on code in PR #36680:
URL: https://github.com/apache/spark/pull/36680#discussion_r883186851


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:
##########
@@ -578,58 +580,70 @@ public long getCurrentPageNumber() {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      List<MemoryBlock> pagesToFree = Lists.newArrayList();

Review Comment:
   Done, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1141664634

   I'll do a final sign off and merge mid-day tomorrow (Tuesday May 31st), since today is a US holiday.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1139512149

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: SPARK-39283: Fixing the deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1138722192

   cc @cloud-fan @srowen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1140129567

   > I believe we can do it without having a local `inMemSorterToFree`
   
   I considered that but didn't suggest it because I thought it was more complex to reason about (since we need to think about other potential accesses and assignments to that field, vs. only having to reason locally about the single method being changed).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1139888789

   @cloud-fan jenkins looks good now, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sandeepvinayak commented on pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on PR #36680:
URL: https://github.com/apache/spark/pull/36680#issuecomment-1142682897

   @JoshRosen Can you please review this when you get chance. Also, it will be great, if we can get this fix as part of next release. thanks ! 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org