You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/12/11 13:09:55 UTC

[spark] branch master updated: [SPARK-26265][CORE] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a3bbca9  [SPARK-26265][CORE] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
a3bbca9 is described below

commit a3bbca98d7d120f22727a55cdc448608e6bb9fad
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 11 21:08:39 2018 +0800

    [SPARK-26265][CORE] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
    
    ## What changes were proposed in this pull request?
    
    In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
    
    So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
    
    To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
    
    ## How was this patch tested?
    
    Added test and manually test by running the test 100 times to make sure there is no deadlock.
    
    Closes #23272 from viirya/SPARK-26265.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 86 ++++++++++++----------
 .../apache/spark/memory/TestMemoryConsumer.java    |  4 +-
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   | 47 ++++++++++++
 3 files changed, 96 insertions(+), 41 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 405e529..fbba002 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -255,11 +255,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
     }
 
     private void advanceToNextPage() {
+      // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
+      // to free a memory page by calling `freePage`. At the same time, it is possibly that another
+      // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
+      // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
+      // reference to the page to free and free it after releasing the lock of `MapIterator`.
+      MemoryBlock pageToFree = null;
+
       synchronized (this) {
         int nextIdx = dataPages.indexOf(currentPage) + 1;
         if (destructive && currentPage != null) {
           dataPages.remove(currentPage);
-          freePage(currentPage);
+          pageToFree = currentPage;
           nextIdx --;
         }
         if (dataPages.size() > nextIdx) {
@@ -283,6 +290,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
           }
         }
       }
+      if (pageToFree != null) {
+        freePage(pageToFree);
+      }
     }
 
     @Override
@@ -329,52 +339,50 @@ public final class BytesToBytesMap extends MemoryConsumer {
       }
     }
 
-    public long spill(long numBytes) throws IOException {
-      synchronized (this) {
-        if (!destructive || dataPages.size() == 1) {
-          return 0L;
-        }
+    public synchronized long spill(long numBytes) throws IOException {
+      if (!destructive || dataPages.size() == 1) {
+        return 0L;
+      }
 
-        updatePeakMemoryUsed();
+      updatePeakMemoryUsed();
 
-        // TODO: use existing ShuffleWriteMetrics
-        ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
+      // TODO: use existing ShuffleWriteMetrics
+      ShuffleWriteMetrics writeMetrics = new ShuffleWriteMetrics();
 
-        long released = 0L;
-        while (dataPages.size() > 0) {
-          MemoryBlock block = dataPages.getLast();
-          // The currentPage is used, cannot be released
-          if (block == currentPage) {
-            break;
-          }
+      long released = 0L;
+      while (dataPages.size() > 0) {
+        MemoryBlock block = dataPages.getLast();
+        // The currentPage is used, cannot be released
+        if (block == currentPage) {
+          break;
+        }
 
-          Object base = block.getBaseObject();
-          long offset = block.getBaseOffset();
-          int numRecords = UnsafeAlignedOffset.getSize(base, offset);
-          int uaoSize = UnsafeAlignedOffset.getUaoSize();
-          offset += uaoSize;
-          final UnsafeSorterSpillWriter writer =
-            new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
-          while (numRecords > 0) {
-            int length = UnsafeAlignedOffset.getSize(base, offset);
-            writer.write(base, offset + uaoSize, length, 0);
-            offset += uaoSize + length + 8;
-            numRecords--;
-          }
-          writer.close();
-          spillWriters.add(writer);
+        Object base = block.getBaseObject();
+        long offset = block.getBaseOffset();
+        int numRecords = UnsafeAlignedOffset.getSize(base, offset);
+        int uaoSize = UnsafeAlignedOffset.getUaoSize();
+        offset += uaoSize;
+        final UnsafeSorterSpillWriter writer =
+                new UnsafeSorterSpillWriter(blockManager, 32 * 1024, writeMetrics, numRecords);
+        while (numRecords > 0) {
+          int length = UnsafeAlignedOffset.getSize(base, offset);
+          writer.write(base, offset + uaoSize, length, 0);
+          offset += uaoSize + length + 8;
+          numRecords--;
+        }
+        writer.close();
+        spillWriters.add(writer);
 
-          dataPages.removeLast();
-          released += block.size();
-          freePage(block);
+        dataPages.removeLast();
+        released += block.size();
+        freePage(block);
 
-          if (released >= numBytes) {
-            break;
-          }
+        if (released >= numBytes) {
+          break;
         }
-
-        return released;
       }
+
+      return released;
     }
 
     @Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6..6aa577d 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public class TestMemoryConsumer extends MemoryConsumer {
     return used;
   }
 
-  void use(long size) {
+  public void use(long size) {
     long got = taskMemoryManager.acquireExecutionMemory(size, this);
     used += got;
   }
 
-  void free(long size) {
+  public void free(long size) {
     used -= size;
     taskMemoryManager.releaseExecutionMemory(size, this);
   }
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index a11cd53..e5fbafc 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@ import org.mockito.MockitoAnnotations;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.JavaUtils;
@@ -678,4 +680,49 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+    memoryManager.limit(PAGE_SIZE_BYTES);
+    MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
+    TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+    BytesToBytesMap map =
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024);
+
+    Thread thread = new Thread(() -> {
+      int i = 0;
+      long used = 0;
+      while (i < 10) {
+        c1.use(10000000);
+        used += 10000000;
+        i++;
+      }
+      c1.free(used);
+    });
+
+    try {
+      int i;
+      for (i = 0; i < 1024; i++) {
+        final long[] arr = new long[]{i};
+        final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+        loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+      }
+
+      // Starts to require memory at another memory consumer.
+      thread.start();
+
+      BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+      for (i = 0; i < 1024; i++) {
+        iter.next();
+      }
+      assertFalse(iter.hasNext());
+    } finally {
+      map.free();
+      thread.join();
+      for (File spillFile : spillFilesCreated) {
+        assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
+          spillFile.exists());
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org