You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2018/12/11 20:23:21 UTC
[spark] branch branch-2.4 updated: [SPARK-26265][CORE][BRANCH-2.4]
Fix deadlock in BytesToBytesMap.MapIterator when locking both
BytesToBytesMap.MapIterator and TaskMemoryManager
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new e35d287 [SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
e35d287 is described below
commit e35d287dd9fd5b7bd7e06025f535772b482b443c
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Tue Dec 11 12:22:58 2018 -0800
[SPARK-26265][CORE][BRANCH-2.4] Fix deadlock in BytesToBytesMap.MapIterator when locking both BytesToBytesMap.MapIterator and TaskMemoryManager
## What changes were proposed in this pull request?
In `BytesToBytesMap.MapIterator.advanceToNextPage`, We will first lock this `MapIterator` and then `TaskMemoryManager` when going to free a memory page by calling `freePage`. At the same time, it is possibly that another memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it acquires memory and causes spilling on this `MapIterator`.
So it ends with the `MapIterator` object holds lock to the `MapIterator` object and waits for lock on `TaskMemoryManager`, and the other consumer holds lock to `TaskMemoryManager` and waits for lock on the `MapIterator` object.
To avoid deadlock here, this patch proposes to keep reference to the page to free and free it after releasing the lock of `MapIterator`.
This backports the fix to branch-2.4.
## How was this patch tested?
Added test and manually test by running the test 100 times to make sure there is no deadlock.
Closes #23289 from viirya/SPARK-26265-2.4.
Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/unsafe/map/BytesToBytesMap.java | 12 +++++-
.../apache/spark/memory/TestMemoryConsumer.java | 4 +-
.../unsafe/map/AbstractBytesToBytesMapSuite.java | 47 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 3 deletions(-)
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 9b6cbab..6465033 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -267,11 +267,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
private void advanceToNextPage() {
+ // SPARK-26265: We will first lock this `MapIterator` and then `TaskMemoryManager` when going
+ // to free a memory page by calling `freePage`. At the same time, it is possibly that another
+ // memory consumer first locks `TaskMemoryManager` and then this `MapIterator` when it
+ // acquires memory and causes spilling on this `MapIterator`. To avoid deadlock here, we keep
+ // reference to the page to free and free it after releasing the lock of `MapIterator`.
+ MemoryBlock pageToFree = null;
+
synchronized (this) {
int nextIdx = dataPages.indexOf(currentPage) + 1;
if (destructive && currentPage != null) {
dataPages.remove(currentPage);
- freePage(currentPage);
+ pageToFree = currentPage;
nextIdx --;
}
if (dataPages.size() > nextIdx) {
@@ -295,6 +302,9 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
}
}
+ if (pageToFree != null) {
+ freePage(pageToFree);
+ }
}
@Override
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 0bbaea6..6aa577d 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -38,12 +38,12 @@ public class TestMemoryConsumer extends MemoryConsumer {
return used;
}
- void use(long size) {
+ public void use(long size) {
long got = taskMemoryManager.acquireExecutionMemory(size, this);
used += got;
}
- void free(long size) {
+ public void free(long size) {
used -= size;
taskMemoryManager.releaseExecutionMemory(size, this);
}
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 53a233f..278d28f 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -33,6 +33,8 @@ import org.mockito.MockitoAnnotations;
import org.apache.spark.SparkConf;
import org.apache.spark.executor.ShuffleWriteMetrics;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryManager;
import org.apache.spark.network.util.JavaUtils;
@@ -667,4 +669,49 @@ public abstract class AbstractBytesToBytesMapSuite {
}
}
+ @Test
+ public void avoidDeadlock() throws InterruptedException {
+ memoryManager.limit(PAGE_SIZE_BYTES);
+ MemoryMode mode = useOffHeapMemoryAllocator() ? MemoryMode.OFF_HEAP: MemoryMode.ON_HEAP;
+ TestMemoryConsumer c1 = new TestMemoryConsumer(taskMemoryManager, mode);
+ BytesToBytesMap map =
+ new BytesToBytesMap(taskMemoryManager, blockManager, serializerManager, 1, 0.5, 1024, false);
+
+ Thread thread = new Thread(() -> {
+ int i = 0;
+ long used = 0;
+ while (i < 10) {
+ c1.use(10000000);
+ used += 10000000;
+ i++;
+ }
+ c1.free(used);
+ });
+
+ try {
+ int i;
+ for (i = 0; i < 1024; i++) {
+ final long[] arr = new long[]{i};
+ final BytesToBytesMap.Location loc = map.lookup(arr, Platform.LONG_ARRAY_OFFSET, 8);
+ loc.append(arr, Platform.LONG_ARRAY_OFFSET, 8, arr, Platform.LONG_ARRAY_OFFSET, 8);
+ }
+
+ // Starts to require memory at another memory consumer.
+ thread.start();
+
+ BytesToBytesMap.MapIterator iter = map.destructiveIterator();
+ for (i = 0; i < 1024; i++) {
+ iter.next();
+ }
+ assertFalse(iter.hasNext());
+ } finally {
+ map.free();
+ thread.join();
+ for (File spillFile : spillFilesCreated) {
+ assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
+ spillFile.exists());
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org