You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/26 23:55:25 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #36680: [SPARK-39283][CORE] Fix deadlock between TaskMemoryManager and UnsafeExternalSorter.SpillableIterator

JoshRosen commented on code in PR #36680:
URL: https://github.com/apache/spark/pull/36680#discussion_r883157576


##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:
##########
@@ -578,58 +580,70 @@ public long getCurrentPageNumber() {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      List<MemoryBlock> pagesToFree = Lists.newArrayList();
+      try {
+        synchronized (this) {
+          if (inMemSorter == null) {
+            return 0L;
+          }
 
-        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
-        if (numRecords > 0) {
-          // Iterate over the records that have not been returned and spill them.
-          final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(
-                  blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
-          spillIterator(upstream, spillWriter);
-          spillWriters.add(spillWriter);
-          upstream = spillWriter.getReader(serializerManager);
-        } else {
-          // Nothing to spill as all records have been read already, but do not return yet, as the
-          // memory still has to be freed.
-          upstream = null;
-        }
+          long currentPageNumber = upstream.getCurrentPageNumber();
+
+          ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+          if (numRecords > 0) {
+            // Iterate over the records that have not been returned and spill them.
+            final UnsafeSorterSpillWriter spillWriter = new UnsafeSorterSpillWriter(
+                    blockManager, fileBufferSizeBytes, writeMetrics, numRecords);
+            spillIterator(upstream, spillWriter);
+            spillWriters.add(spillWriter);
+            upstream = spillWriter.getReader(serializerManager);
+          } else {
+            // Nothing to spill as all records have been read already, but do not return yet, as the
+            // memory still has to be freed.
+            upstream = null;
+          }
 
-        long released = 0L;
-        synchronized (UnsafeExternalSorter.this) {
-          // release the pages except the one that is used. There can still be a caller that
-          // is accessing the current record. We free this page in that caller's next loadNext()
-          // call.
-          for (MemoryBlock page : allocatedPages) {
-            if (!loaded || page.pageNumber != currentPageNumber) {
-              released += page.size();
-              freePage(page);
-            } else {
-              lastPage = page;
+          long released = 0L;
+          synchronized (UnsafeExternalSorter.this) {
+            // release the pages except the one that is used. There can still be a caller that
+            // is accessing the current record. We free this page in that caller's next loadNext()
+            // call.
+            for (MemoryBlock page : allocatedPages) {
+              if (!loaded || page.pageNumber != currentPageNumber) {
+                released += page.size();
+                pagesToFree.add(page);
+              } else {
+                lastPage = page;
+              }
+            }
+            allocatedPages.clear();
+            if (lastPage != null) {
+              // Add the last page back to the list of allocated pages to make sure it gets freed in
+              // case loadNext() never gets called again.
+              allocatedPages.add(lastPage);
             }
           }
-          allocatedPages.clear();
-          if (lastPage != null) {
-            // Add the last page back to the list of allocated pages to make sure it gets freed in
-            // case loadNext() never gets called again.
-            allocatedPages.add(lastPage);
-          }
-        }
 
-        // in-memory sorter will not be used after spilling
-        assert(inMemSorter != null);
-        released += inMemSorter.getMemoryUsage();
-        totalSortTimeNanos += inMemSorter.getSortTimeNanos();
-        inMemSorter.freeMemory();
-        inMemSorter = null;
-        taskContext.taskMetrics().incMemoryBytesSpilled(released);
-        taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
-        totalSpillBytes += released;
-        return released;
+          // in-memory sorter will not be used after spilling
+          assert (inMemSorter != null);
+          released += inMemSorter.getMemoryUsage();
+          totalSortTimeNanos += inMemSorter.getSortTimeNanos();
+          inMemSorter.freeMemory();
+          inMemSorter = null;
+          taskContext.taskMetrics().incMemoryBytesSpilled(released);
+          taskContext.taskMetrics().incDiskBytesSpilled(writeMetrics.bytesWritten());
+          totalSpillBytes += released;
+          return released;
+        }
+      } finally {
+        // Do not free the page, while we are locking `SpillableIterator`. The `freePage`
+        // method locks the `TaskMemoryManager`, and it's not a good idea to lock 2 objects in
+        // sequence. We may hit dead lock if another thread locks `TaskMemoryManager` and
+        // `SpillableIterator` in sequence, which may happen in
+        // `TaskMemoryManager.acquireExecutionMemory`.

Review Comment:
   Minor nit: for consistency with the `loadNext()` method (where this comment was copied from), do you mind moving this comment up so that it's located right before the `pagesToFree.add(page)` call on line 614? As worded, I think the comment makes more sense at that location.



##########
core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:
##########
@@ -578,58 +580,70 @@ public long getCurrentPageNumber() {
     }
 
     public long spill() throws IOException {
-      synchronized (this) {
-        if (inMemSorter == null) {
-          return 0L;
-        }
-
-        long currentPageNumber = upstream.getCurrentPageNumber();
+      List<MemoryBlock> pagesToFree = Lists.newArrayList();

Review Comment:
   Minor nit: Guava says that `Lists.newArrayList` should be treated as deprecated in Java 7+: https://guava.dev/releases/21.0/api/docs/com/google/common/collect/Lists.html#newArrayList--
   
   To avoid us having to update this code in the future, could you change this to 
   
   ```
   ArrayList<MemoryBlock> pagesToFree = new ArrayList<>();
   ```
   
   and update the imports?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org