You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:31 UTC
[14/50] git commit: Use real serializer & manual ordering
Use real serializer & manual ordering
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1dc0440c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1dc0440c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1dc0440c
Branch: refs/heads/master
Commit: 1dc0440c1ac8882c13de99169e5535b005a801e4
Parents: 0f66b7f
Author: Aaron Davidson <aa...@databricks.com>
Authored: Thu Dec 26 23:20:14 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:08 2013 -0800
----------------------------------------------------------------------
.../util/collection/ExternalAppendOnlyMap.scala | 38 ++++++++++++++------
1 file changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1dc0440c/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 991dd18..b804529 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -22,6 +22,10 @@ import java.io._
import scala.collection.mutable.{ArrayBuffer, PriorityQueue}
import scala.reflect.ClassTag
+import org.apache.spark.SparkEnv
+import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.SpillableAppendOnlyMap.KeyHashOrdering
+
/**
* A wrapper for SpillableAppendOnlyMap that handles two cases:
*
@@ -31,18 +35,20 @@ import scala.reflect.ClassTag
* (2) Otherwise, group values of the same key together before disk spill, and merge them
* into combiners only after reading them back from disk.
*/
-class ExternalAppendOnlyMap[K, V, C: ClassTag](
+private[spark] class ExternalAppendOnlyMap[K, V, C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)
extends Iterable[(K, C)] with Serializable {
+ private val serializer = SparkEnv.get.serializerManager.default
+
private val mergeBeforeSpill: Boolean = mergeCombiners != null
private val map: SpillableAppendOnlyMap[K, V, _, C] = {
if (mergeBeforeSpill) {
new SpillableAppendOnlyMap[K, V, C, C] (createCombiner, mergeValue,
- mergeCombiners, Predef.identity)
+ mergeCombiners, Predef.identity, serializer)
} else {
// Use ArrayBuffer[V] as the intermediate combiner
val createGroup: (V => ArrayBuffer[V]) = value => ArrayBuffer[V](value)
@@ -63,7 +69,7 @@ class ExternalAppendOnlyMap[K, V, C: ClassTag](
combiner.getOrElse(null.asInstanceOf[C])
}
new SpillableAppendOnlyMap[K, V, ArrayBuffer[V], C](createGroup, mergeValueIntoGroup,
- mergeGroups, combineGroup)
+ mergeGroups, combineGroup, serializer)
}
}
@@ -76,13 +82,16 @@ class ExternalAppendOnlyMap[K, V, C: ClassTag](
* An append-only map that spills sorted content to disk when the memory threshold is exceeded.
* A group is an intermediate combiner, with type M equal to either C or ArrayBuffer[V].
*/
-class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
+private[spark] class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
createGroup: V => M,
mergeValue: (M, V) => M,
mergeGroups: (M, M) => M,
- createCombiner: M => C)
+ createCombiner: M => C,
+ serializer: Serializer)
extends Iterable[(K, C)] with Serializable {
+ val ser = serializer.newInstance()
+
var currentMap = new SizeTrackingAppendOnlyMap[K, M]
val oldMaps = new ArrayBuffer[DiskIterator]
val memoryThreshold = {
@@ -90,7 +99,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
bufferSize * bufferPercent
}
- val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode())
+ val ordering = new KeyHashOrdering[K, M]()
def insert(key: K, value: V): Unit = {
val update: (Boolean, M) => M = (hadVal, oldVal) => {
@@ -103,10 +112,9 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
}
def spill(): Unit = {
- println("******************* SPILL *********************")
val file = File.createTempFile("external_append_only_map", "")
- val out = new ObjectOutputStream(new FileOutputStream(file))
- val it = currentMap.destructiveSortedIterator(KMOrdering)
+ val out = ser.serializeStream(new FileOutputStream(file))
+ val it = currentMap.destructiveSortedIterator(ordering)
while (it.hasNext) {
val kv = it.next()
out.writeObject(kv)
@@ -129,7 +137,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
// Order by key hash value
val pq = new PriorityQueue[KMITuple]
- val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps
+ val inputStreams = Seq(currentMap.destructiveSortedIterator(ordering)) ++ oldMaps
inputStreams.foreach(readFromIterator)
// Read from the given iterator until a key of different hash is retrieved
@@ -181,7 +189,7 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
// Iterate through (K, M) pairs in sorted order from an on-disk map
class DiskIterator(file: File) extends Iterator[(K, M)] {
- val in = new ObjectInputStream(new FileInputStream(file))
+ val in = ser.deserializeStream(new FileInputStream(file))
var nextItem: Option[(K, M)] = None
override def hasNext: Boolean = {
@@ -201,3 +209,11 @@ class SpillableAppendOnlyMap[K, V, M: ClassTag, C: ClassTag](
}
}
}
+
+private[spark] object SpillableAppendOnlyMap {
+ private class KeyHashOrdering[K, M] extends Ordering[(K, M)] {
+ def compare(x: (K, M), y: (K, M)): Int = {
+ x._1.hashCode().compareTo(y._1.hashCode())
+ }
+ }
+}