You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/05/21 01:39:30 UTC

spark git commit: [SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation

Repository: spark
Updated Branches:
  refs/heads/master 3c434cbfd -> 7956dd7ab


[SPARK-7698] Cache and reuse buffers in ExecutorMemoryAllocator when using heap allocation

When on-heap memory allocation is used, ExecutorMemoryManager should maintain a cache / pool of buffers for re-use by tasks. This will significantly improve the performance of the new Tungsten's sort-shuffle for jobs with many short-lived tasks by eliminating a major source of GC.

This pull request is a minimum-viable-implementation of this idea.  In its current form, this patch significantly improves performance on a stress test which launches huge numbers of short-lived shuffle map tasks back-to-back in the same JVM.

Author: Josh Rosen <jo...@databricks.com>

Closes #6227 from JoshRosen/SPARK-7698 and squashes the following commits:

fd6cb55 [Josh Rosen] SoftReference -> WeakReference
b154e86 [Josh Rosen] WIP sketch of pooling in ExecutorMemoryManager


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7956dd7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7956dd7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7956dd7a

Branch: refs/heads/master
Commit: 7956dd7ab03e1542d89dd94c043f1e5131684199
Parents: 3c434cb
Author: Josh Rosen <jo...@databricks.com>
Authored: Wed May 20 16:37:11 2015 -0700
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed May 20 16:37:11 2015 -0700

----------------------------------------------------------------------
 .../unsafe/memory/ExecutorMemoryManager.java    | 57 +++++++++++++++++++-
 1 file changed, 55 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7956dd7a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
----------------------------------------------------------------------
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
index 62c29c8..cbbe859 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/memory/ExecutorMemoryManager.java
@@ -17,6 +17,12 @@
 
 package org.apache.spark.unsafe.memory;
 
+import java.lang.ref.WeakReference;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import javax.annotation.concurrent.GuardedBy;
+
 /**
  * Manages memory for an executor. Individual operators / tasks allocate memory through
  * {@link TaskMemoryManager} objects, which obtain their memory from ExecutorMemoryManager.
@@ -33,6 +39,12 @@ public class ExecutorMemoryManager {
    */
   final boolean inHeap;
 
+  @GuardedBy("this")
+  private final Map<Long, LinkedList<WeakReference<MemoryBlock>>> bufferPoolsBySize =
+    new HashMap<Long, LinkedList<WeakReference<MemoryBlock>>>();
+
+  private static final int POOLING_THRESHOLD_BYTES = 1024 * 1024;
+
   /**
    * Construct a new ExecutorMemoryManager.
    *
@@ -44,15 +56,56 @@ public class ExecutorMemoryManager {
   }
 
   /**
+   * Returns true if allocations of the given size should go through the pooling mechanism and
+   * false otherwise.
+   */
+  private boolean shouldPool(long size) {
+    // Very small allocations are less likely to benefit from pooling.
+    // At some point, we should explore supporting pooling for off-heap memory, but for now we'll
+    // ignore that case in the interest of simplicity.
+    return size >= POOLING_THRESHOLD_BYTES && allocator instanceof HeapMemoryAllocator;
+  }
+
+  /**
    * Allocates a contiguous block of memory. Note that the allocated memory is not guaranteed
    * to be zeroed out (call `zero()` on the result if this is necessary).
    */
   MemoryBlock allocate(long size) throws OutOfMemoryError {
-    return allocator.allocate(size);
+    if (shouldPool(size)) {
+      synchronized (this) {
+        final LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+        if (pool != null) {
+          while (!pool.isEmpty()) {
+            final WeakReference<MemoryBlock> blockReference = pool.pop();
+            final MemoryBlock memory = blockReference.get();
+            if (memory != null) {
+              assert (memory.size() == size);
+              return memory;
+            }
+          }
+          bufferPoolsBySize.remove(size);
+        }
+      }
+      return allocator.allocate(size);
+    } else {
+      return allocator.allocate(size);
+    }
   }
 
   void free(MemoryBlock memory) {
-    allocator.free(memory);
+    final long size = memory.size();
+    if (shouldPool(size)) {
+      synchronized (this) {
+        LinkedList<WeakReference<MemoryBlock>> pool = bufferPoolsBySize.get(size);
+        if (pool == null) {
+          pool = new LinkedList<WeakReference<MemoryBlock>>();
+          bufferPoolsBySize.put(size, pool);
+        }
+        pool.add(new WeakReference<MemoryBlock>(memory));
+      }
+    } else {
+      allocator.free(memory);
+    }
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org