You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/11 01:25:33 UTC

[16/50] git commit: Sort AppendOnlyMap in-place

Sort AppendOnlyMap in-place


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ec8c5dc6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ec8c5dc6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ec8c5dc6

Branch: refs/heads/master
Commit: ec8c5dc644ce97c8cf6e13ba2b216ddbe16e9e0a
Parents: a515706
Author: Andrew Or <an...@gmail.com>
Authored: Thu Dec 26 21:22:38 2013 -0800
Committer: Andrew Or <an...@gmail.com>
Committed: Thu Dec 26 23:40:08 2013 -0800

----------------------------------------------------------------------
 .../spark/util/collection/AppendOnlyMap.scala   | 33 ++++++++++++++++++++
 .../util/collection/ExternalAppendOnlyMap.scala | 32 ++++++++++---------
 2 files changed, 51 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec8c5dc6/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
index cb0ca8f..38f3c55 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util.collection
 
+import java.util
+
 /**
  * A simple open hash table optimized for the append-only use case, where keys
  * are never removed, but the value for each key may be changed.
@@ -234,4 +236,35 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     val highBit = Integer.highestOneBit(n)
     if (highBit == n) n else highBit << 1
   }
+
+  // Return an iterator of the map in sorted order.
+  // Note that the validity of the map is no longer preserved.
+  def destructiveSortedIterator(ord: Ordering[(K, V)]): Iterator[(K, V)] = {
+    var keyIndex, newIndex = 0
+    while (keyIndex < capacity) {
+      if (data(2 * keyIndex) != null) {
+        data(newIndex) = (data(2 * keyIndex), data(2 * keyIndex + 1))
+        newIndex += 1
+      }
+      keyIndex += 1
+    }
+    // sort
+    assert(newIndex == curSize)
+    val rawOrdering = new Ordering[AnyRef] {
+      def compare(x: AnyRef, y: AnyRef): Int ={
+        ord.compare(x.asInstanceOf[(K, V)], y.asInstanceOf[(K, V)])
+      }
+    }
+    util.Arrays.sort(data, 0, curSize, rawOrdering)
+
+    new Iterator[(K, V)] {
+      var i = 0
+      def hasNext = i < curSize
+      def next(): (K, V) = {
+        val item = data(i).asInstanceOf[(K, V)]
+        i += 1
+        item
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec8c5dc6/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 4bda763..ed8b1d3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -88,6 +88,7 @@ class SpillableAppendOnlyMap[K, V, M, C](
     val bufferPercent = System.getProperty("spark.shuffle.buffer.percent", "0.8").toFloat
     bufferSize * bufferPercent
   }
+  val KMOrdering: Ordering[(K, M)] = Ordering.by(km => km._1.hashCode())
 
   def insert(key: K, value: V): Unit = {
     val update: (Boolean, M) => M = (hadVal, oldVal) => {
@@ -100,10 +101,14 @@ class SpillableAppendOnlyMap[K, V, M, C](
   }
 
   def spill(): Unit = {
+    println("******************* SPILL *********************")
     val file = File.createTempFile("external_append_only_map", "")
     val out = new ObjectOutputStream(new FileOutputStream(file))
-    val sortedMap = currentMap.iterator.toList.sortBy(kv => kv._1.hashCode())
-    sortedMap.foreach(out.writeObject)
+    val it = currentMap.destructiveSortedIterator(KMOrdering)
+    while (it.hasNext) {
+      val kv = it.next()
+      out.writeObject(kv)
+    }
     out.close()
     currentMap = new SizeTrackingAppendOnlyMap[K, M]
     oldMaps.append(new DiskIterator(file))
@@ -115,8 +120,8 @@ class SpillableAppendOnlyMap[K, V, M, C](
   class ExternalIterator extends Iterator[(K, C)] {
 
     // Order by key hash value
-    val pq = PriorityQueue[KMITuple]()(Ordering.by(_.key.hashCode()))
-    val inputStreams = Seq(new MemoryIterator(currentMap)) ++ oldMaps
+    val pq = new PriorityQueue[KMITuple]
+    val inputStreams = Seq(currentMap.destructiveSortedIterator(KMOrdering)) ++ oldMaps
     inputStreams.foreach(readFromIterator)
 
     // Read from the given iterator until a key of different hash is retrieved
@@ -127,7 +132,10 @@ class SpillableAppendOnlyMap[K, V, M, C](
         pq.enqueue(KMITuple(k, m, it))
         minHash match {
           case None => minHash = Some(k.hashCode())
-          case Some(expectedHash) if k.hashCode() != expectedHash => return
+          case Some(expectedHash) =>
+            if (k.hashCode() != expectedHash) {
+              return
+            }
         }
       }
     }
@@ -156,15 +164,11 @@ class SpillableAppendOnlyMap[K, V, M, C](
       (minKey, createCombiner(minGroup))
     }
 
-    case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)])
-  }
-
-  // Iterate through (K, M) pairs in sorted order from the in-memory map
-  class MemoryIterator(map: AppendOnlyMap[K, M]) extends Iterator[(K, M)] {
-    val sortedMap = currentMap.iterator.toList.sortBy(km => km._1.hashCode())
-    val it = sortedMap.iterator
-    override def hasNext: Boolean = it.hasNext
-    override def next(): (K, M) = it.next()
+    case class KMITuple(key: K, group: M, iterator: Iterator[(K, M)]) extends Ordered[KMITuple] {
+      def compare(other: KMITuple): Int = {
+        -key.hashCode().compareTo(other.key.hashCode())
+      }
+    }
   }
 
   // Iterate through (K, M) pairs in sorted order from an on-disk map