You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/11 20:23:21 UTC

[spark] branch branch-2.4 updated: [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new e35d287  [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
e35d287 is described below

commit e35d287dd9fd5b7bd7e06025f535772b482b443c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 11 12:22:58 2018 -0800

    [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
    
    ## What changes were proposed in this pull request?
    
    In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
    
    So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
    
    To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
    
    This backports the fix to branch-2.4.
    
    ## How was this patch tested?
    
     Added test and manually test by running the test 100 times to make sure there is no deadlock.
    
    Closes #23289 from viirya/SPARK-26265-2.4.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../apache/spark/unsafe/map/BytesToBytesMap.java   | 12 +++++-
 .../apache/spark/memory/TestMemoryConsumer.java    |  4 +-
 .../unsafe/map/AbstractBytesToBytesMapSuite.java   | 47 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 3 deletions(-)

diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab..6465033 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -267,11 +267,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
     }
 
     private void advanceToNextPage() {
+      // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
+      // to free a memory page by calling `freePage`. At the same time, it is possibly that another
+      // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
+      // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
+      // reference to the page to free and free it after releasing the lock of `MapIterator`.
+      MemoryBlock pageToFree = null;
+
       synchronized (this) {
         int nextIdx = dataPages.indexOf(currentPage) + 1;
         if (destructive && currentPage != null) {
           dataPages.remove(currentPage);
-          freePage(currentPage);
+          pageToFree = currentPage;
           nextIdx --;
         }
         if (dataPages.size() > nextIdx) {
@@ -295,6 +302,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
           }
         }
       }
+      if (pageToFree != null) {
+        freePage(pageToFree);
+      }
     }
 
     @Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6..6aa577d 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public class TestMemoryConsumer extends MemoryConsumer {
     return used;
   }
 
-  void use(long size) {
+  public void use(long size) {
     long got = taskMemoryManager.acquireExecutionMemory(size, this);
     used += got;
   }
 
-  void free(long size) {
+  public void free(long size) {
     used -= size;
     taskMemoryManager.releaseExecutionMemory(size, this);
   }
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f..278d28f 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@ import org.mockito.MockitoAnnotations;
 
 import org.apache.spark.SparkConf;
 import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.memory.TestMemoryManager;
 import org.apache.spark.network.util.JavaUtils;
@@ -667,4 +669,49 @@ public abstract class AbstractBytesToBytesMapSuite {
     }
   }
 
+  @Test
+  public void avoidDeadlock() throws InterruptedException {
+    memoryManager.limit(PAGE_SIZE_BYTES);
+    MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
+    TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+    BytesToBytesMap map =
+      new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
+
+    Thread thread = new Thread(() -> {
+      int i = 0;
+      long used = 0;
+      while (i < 10) {
+        c1.use(10000000);
+        used += 10000000;
+        i++;
+      }
+      c1.free(used);
+    });
+
+    try {
+      int i;
+      for (i = 0; i < 1024; i++) {
+        final long[] arr = new long[]{i};
+        final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+        loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+      }
+
+      // Starts to require memory at another memory consumer.
+      thread.start();
+
+      BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+      for (i = 0; i < 1024; i++) {
+        iter.next();
+      }
+      assertFalse(iter.hasNext());
+    } finally {
+      map.free();
+      thread.join();
+      for (File spillFile : spillFilesCreated) {
+        assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
+          spillFile.exists());
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org