You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/14 07:32:45 UTC

[3/7] git commit: Get rid of spill map in SparkEnv

Get rid of spill map in SparkEnv


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/8d40e722
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/8d40e722
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/8d40e722

Branch: refs/heads/master
Commit: 8d40e7222f2a0a421349621105dc4c69bd7f1bb8
Parents: bb8098f
Author: Andrew Or <an...@gmail.com>
Authored: Sun Jan 12 22:34:33 2014 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Sun Jan 12 22:34:33 2014 -0800

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Aggregator.scala        | 7 +++----
 core/src/main/scala/org/apache/spark/SparkEnv.scala          | 3 ---
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 2 --
 core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala  | 2 +-
 .../apache/spark/util/collection/ExternalAppendOnlyMap.scala | 8 +++-----
 5 files changed, 7 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 0609b7b..c46b7bd 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -47,13 +47,12 @@ case class Aggregator[K, V, C] (
       }
       combiners.iterator
     } else {
-      val combiners =
-        new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
+      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
       while (iter.hasNext) {
         val (k, v) = iter.next()
         combiners.insert(k, v)
       }
-      combiners.registerBytesSpilled(context.attemptId)
+      context.taskMetrics.bytesSpilled = combiners.bytesSpilled
       combiners.iterator
     }
   }
@@ -76,7 +75,7 @@ case class Aggregator[K, V, C] (
         val (k, c) = iter.next()
         combiners.insert(k, c)
       }
-      combiners.registerBytesSpilled(context.attemptId)
+      context.taskMetrics.bytesSpilled = combiners.bytesSpilled
       combiners.iterator
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6ec0750..08b592d 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -60,9 +60,6 @@ class SparkEnv private[spark] (
   // All accesses should be manually synchronized
   val shuffleMemoryMap = mutable.HashMap[Long, Long]()
 
-  // A mapping of task ID to number of bytes spilled by that task. This is mainly for book-keeping.
-  val bytesSpilledMap = mutable.HashMap[Long, Long]()
-
   private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
 
   // A general, soft-reference map for metadata needed during HadoopRDD split computation

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d5dae89..dd4aea0 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -229,7 +229,6 @@ private[spark] class Executor(
           m.executorRunTime = (taskFinish - taskStart).toInt
           m.jvmGCTime = gcTime - startGCTime
           m.resultSerializationTime = (afterSerialization - beforeSerialization).toInt
-          m.bytesSpilled = env.bytesSpilledMap.get(taskId).getOrElse(0)
         }
 
         val accumUpdates = Accumulators.values
@@ -285,7 +284,6 @@ private[spark] class Executor(
         shuffleMemoryMap.synchronized {
           shuffleMemoryMap.remove(Thread.currentThread().getId)
         }
-        env.bytesSpilledMap.remove(taskId)
         runningTasks.remove(taskId)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 6df2b3a..34e8341 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -151,7 +151,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
           map.insert(kv._1, new CoGroupValue(kv._2, depNum))
         }
       }
-      map.registerBytesSpilled(context.attemptId)
+      context.taskMetrics.bytesSpilled = map.bytesSpilled
       new InterruptibleIterator(context, map.iterator)
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/8d40e722/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 0100083..f4e53c4 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -86,7 +86,7 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
   private var spillCount = 0
 
   // Number of bytes spilled in total
-  private var bytesSpilled = 0L
+  private var _bytesSpilled = 0L
 
   private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
   private val syncWrites = sparkConf.getBoolean("spark.shuffle.sync", false)
@@ -164,15 +164,13 @@ private[spark] class ExternalAppendOnlyMap[K, V, C](
       shuffleMemoryMap(Thread.currentThread().getId) = 0
     }
     numPairsInMemory = 0
-    bytesSpilled += mapSize
+    _bytesSpilled += mapSize
   }
 
   /**
    * Register the total number of bytes spilled by this task
    */
-  def registerBytesSpilled(taskId: Long) {
-    SparkEnv.get.bytesSpilledMap(taskId) = bytesSpilled
-  }
+  def bytesSpilled: Long = _bytesSpilled
 
   /**
    * Return an iterator that merges the in-memory map with the spilled maps.