You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/07/31 01:50:33 UTC

spark git commit: [SPARK-19839][CORE] release longArray in BytesToBytesMap

Repository: spark
Updated Branches:
  refs/heads/master f1a798b57 -> 44e501ace


[SPARK-19839][CORE] release longArray in BytesToBytesMap

## What changes were proposed in this pull request?
When BytesToBytesMap spills, its longArray should be released. Otherwise, it may not released until the task complete. This array may take a significant amount of memory, which cannot be used by later operator, such as UnsafeShuffleExternalSorter, resulting in more frequent spill in sorter. This patch release the array as destructive iterator will not use this array anymore.

## How was this patch tested?
Manual test in production

Author: Zhan Zhang <zh...@fb.com>

Closes #17180 from zhzhan/memory.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/44e501ac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/44e501ac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/44e501ac

Branch: refs/heads/master
Commit: 44e501ace38ea6e6535a8731e045681175d170ec
Parents: f1a798b
Author: Zhan Zhang <zh...@fb.com>
Authored: Sun Jul 30 18:50:19 2017 -0700
Committer: Xiao Li <ga...@gmail.com>
Committed: Sun Jul 30 18:50:19 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java  | 5 +++++
 .../sql/execution/UnsafeFixedWidthAggregationMapSuite.scala     | 5 +++--
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/44e501ac/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 3b6200e..610ace3 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -258,6 +258,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
       this.destructive = destructive;
       if (destructive) {
         destructiveIterator = this;
+        // longArray will not be used anymore if destructive is true, release it now.
+        if (longArray != null) {
+          freeArray(longArray);
+          longArray = null;
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/44e501ac/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 50d8e30..d194f58 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -127,9 +127,10 @@ class UnsafeFixedWidthAggregationMapSuite
       PAGE_SIZE_BYTES
     )
     val groupKey = InternalRow(UTF8String.fromString("cats"))
+    val row = map.getAggregationBuffer(groupKey)
 
     // Looking up a key stores a zero-entry in the map (like Python Counters or DefaultDicts)
-    assert(map.getAggregationBuffer(groupKey) != null)
+    assert(row != null)
     val iter = map.iterator()
     assert(iter.next())
     iter.getKey.getString(0) should be ("cats")
@@ -138,7 +139,7 @@ class UnsafeFixedWidthAggregationMapSuite
 
     // Modifications to rows retrieved from the map should update the values in the map
     iter.getValue.setInt(0, 42)
-    map.getAggregationBuffer(groupKey).getInt(0) should be (42)
+    row.getInt(0) should be (42)
 
     map.free()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org