You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/01/25 07:25:15 UTC

spark git commit: [SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily

Repository: spark
Updated Branches:
  refs/heads/master 6f0ba8472 -> 45b4bbfdd


[SPARK-23129][CORE] Make deserializeStream of DiskMapIterator init lazily

## What changes were proposed in this pull request?

Currently,the deserializeStream in ExternalAppendOnlyMap#DiskMapIterator init when DiskMapIterator instance created.This will cause memory use overhead when ExternalAppendOnlyMap spill too much times.

We can avoid this by making deserializeStream init when it is used the first time.
This patch make deserializeStream init lazily.

## How was this patch tested?

Exist tests

Author: zhoukang <zh...@gmail.com>

Closes #20292 from caneGuy/zhoukang/lay-diskmapiterator.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45b4bbfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45b4bbfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45b4bbfd

Branch: refs/heads/master
Commit: 45b4bbfddc18a77011c3bc1bfd71b2cd3466443c
Parents: 6f0ba84
Author: zhoukang <zh...@gmail.com>
Authored: Thu Jan 25 15:24:52 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jan 25 15:24:52 2018 +0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala    | 17 ++++++++++-------
 1 file changed, 10 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45b4bbfd/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 375f4a6..5c6dd45 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -463,7 +463,7 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // An intermediate stream that reads from exactly one batch
     // This guards against pre-fetching and other arbitrary behavior of higher level streams
-    private var deserializeStream = nextBatchStream()
+    private var deserializeStream: DeserializationStream = null
     private var nextItem: (K, C) = null
     private var objectsRead = 0
 
@@ -528,7 +528,11 @@ class ExternalAppendOnlyMap[K, V, C](
     override def hasNext: Boolean = {
       if (nextItem == null) {
         if (deserializeStream == null) {
-          return false
+          // In case of deserializeStream has not been initialized
+          deserializeStream = nextBatchStream()
+          if (deserializeStream == null) {
+            return false
+          }
         }
         nextItem = readNextItem()
       }
@@ -536,19 +540,18 @@ class ExternalAppendOnlyMap[K, V, C](
     }
 
     override def next(): (K, C) = {
-      val item = if (nextItem == null) readNextItem() else nextItem
-      if (item == null) {
+      if (!hasNext) {
         throw new NoSuchElementException
       }
+      val item = nextItem
       nextItem = null
       item
     }
 
     private def cleanup() {
       batchIndex = batchOffsets.length  // Prevent reading any other batch
-      val ds = deserializeStream
-      if (ds != null) {
-        ds.close()
+      if (deserializeStream != null) {
+        deserializeStream.close()
         deserializeStream = null
       }
       if (fileStream != null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org