You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:31 UTC

[14/50] git commit: Use real serializer & manual ordering

Use real serializer & manual ordering


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1dc0440c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1dc0440c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1dc0440c

Branch: refs/heads/master
Commit: 1dc0440c1ac8882c13de99169e5535b005a801e4
Parents: 0f66b7f
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Dec 26 23:20:14 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:08 2013 -0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala | 38 ++++++++++++++------
 1 file changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dc0440c/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 991dd18..b804529 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -22,6 +22,10 @@ import java.io._
 import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
 import scala.reflect.ClassTag
 
+import org.apache.spark.SparkEnv
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.SpillableAppendOnlyMap.KeyHashOrdering
+
 /**
  * A wrapper for SpillableAppendOnlyMap that handles two cases:
  *
@@ -31,18 +35,20 @@ import scala.reflect.ClassTag
  * (2)  Otherwise, group values of the same key together before disk spill, and merge them
  *      into combiners only after reading them back from disk.
  */
-class ExternalAppendOnlyMap[K, V, C: ClassTag](
+private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C)
   extends Iterable[(K, C)] with Serializable {
 
+  private val serializer = SparkEnv.get.serializerManager.default
+
   private val mergeBeforeSpill: Boolean = mergeCombiners != null
 
   private val map: SpillableAppendOnlyMap[K, V, _, C] = {
     if (mergeBeforeSpill) {
       new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue,
-        mergeCombiners, Predef.identity)
+        mergeCombiners, Predef.identity, serializer)
     } else {
       // Use ArrayBuffer[V] as the intermediate combiner
       val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@@ -63,7 +69,7 @@ class ExternalAppendOnlyMap[K, V, C: ClassTag](
         combiner.getOrElse(null.asInstanceOf[C])
       }
       new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
-        mergeGroups, combineGroup)
+        mergeGroups, combineGroup, serializer)
     }
   }
 
@@ -76,13 +82,16 @@ class ExternalAppendOnlyMap[K, V, C: ClassTag](
  * An append-only map that spills sorted content to disk when the memory threshold is exceeded.
  * A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
  */
-class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
+private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
     createGroup: V => M,
     mergeValue: (M, V) => M,
     mergeGroups: (M, M) => M,
-    createCombiner: M => C)
+    createCombiner: M => C,
+    serializer: Serializer)
   extends Iterable[(K, C)] with Serializable {
 
+  val ser = serializer.newInstance()
+
   var currentMap = new SizeTrackingAppendOnlyMap[K, M]
   val oldMaps = new ArrayBuffer[DiskIterator]
   val memoryThreshold = {
@@ -90,7 +99,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
     val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
     bufferSize * bufferPercent
   }
-  val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode())
+  val ordering = new KeyHashOrdering[K, M]()
 
   def insert(key: K, value: V): Unit = {
     val update: (Boolean, M) => M = (hadVal, oldVal) => {
@@ -103,10 +112,9 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
   }
 
   def spill(): Unit = {
-    println("******************* SPILL *********************")
     val file = File.createTempFile("external_append_only_map", "")
-    val out = new ObjectOutputStream(new FileOutputStream(file))
-    val it = currentMap.destructiveSortedIterator(KMOrdering)
+    val out = ser.serializeStream(new FileOutputStream(file))
+    val it = currentMap.destructiveSortedIterator(ordering)
     while (it.hasNext) {
       val kv = it.next()
       out.writeObject(kv)
@@ -129,7 +137,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
 
     // Order by key hash value
     val pq = new PriorityQueue[KMITuple]
-    val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps
+    val inputStreams = Seq(currentMap.destructiveSortedIterator(ordering)) ++ oldMaps
     inputStreams.foreach(readFromIterator)
 
     // Read from the given iterator until a key of different hash is retrieved
@@ -181,7 +189,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
 
   // Iterate through (K, M) pairs in sorted order from an on-disk map
   class DiskIterator(file: File) extends Iterator[(K, M)] {
-    val in = new ObjectInputStream(new FileInputStream(file))
+    val in = ser.deserializeStream(new FileInputStream(file))
     var nextItem: Option[(K, M)] = None
 
     override def hasNext: Boolean = {
@@ -201,3 +209,11 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
     }
   }
 }
+
+private[spark] object SpillableAppendOnlyMap {
+  private class KeyHashOrdering[K, M] extends Ordering[(K, M)] {
+    def compare(x: (K, M), y: (K, M)): Int = {
+      x._1.hashCode().compareTo(y._1.hashCode())
+    }
+  }
+}