You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/24 04:04:35 UTC

git commit: Merge pull request #200 from mateiz/hash-fix

Updated Branches:
  refs/heads/branch-0.8 d7ab87e06 -> c59ce1808


Merge pull request #200 from mateiz/hash-fix

AppendOnlyMap fixes

- Chose a more random reshuffling step for values returned by Object.hashCode to avoid some long chaining that was happening for consecutive integers (e.g. `sc.makeRDD(1 to 100000000, 100).map(t => (t, t)).reduceByKey(_ + _).count`)
- Some other small optimizations throughout (see commit comments)

(cherry picked from commit 718cc803f7e0600c9ab265022eb6027926a38010)
Signed-off-by: Reynold Xin <rx...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c59ce180
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c59ce180
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c59ce180

Branch: refs/heads/branch-0.8
Commit: c59ce18088df21d71006e0216c775f533eb128e2
Parents: d7ab87e
Author: Reynold Xin <rx...@apache.org>
Authored: Sun Nov 24 11:02:02 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 24 11:04:00 2013 +0800

----------------------------------------------------------------------
 .../org/apache/spark/util/AppendOnlyMap.scala   | 93 +++++++++++---------
 1 file changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c59ce180/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index f60deaf..8bb4ee3 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   private var capacity = nextPowerOf2(initialCapacity)
   private var mask = capacity - 1
   private var curSize = 0
+  private var growThreshold = LOAD_FACTOR * capacity
 
   // Holds keys and values in the same array for memory locality; specifically, the order of
   // elements is key0, value0, key1, value1, key2, value2, etc.
@@ -56,7 +57,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         return data(2 * pos + 1).asInstanceOf[V]
       } else if (curKey.eq(null)) {
         return null.asInstanceOf[V]
@@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       haveNullValue = true
       return
     }
-    val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef])
-    if (isNewEntry) {
-      incrementSize()
+    var pos = rehash(key.hashCode) & mask
+    var i = 1
+    while (true) {
+      val curKey = data(2 * pos)
+      if (curKey.eq(null)) {
+        data(2 * pos) = k
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        incrementSize()  // Since we added a new key
+        return
+      } else if (k.eq(curKey) || k.equals(curKey)) {
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        return
+      } else {
+        val delta = i
+        pos = (pos + delta) & mask
+        i += 1
+      }
     }
   }
 
@@ -104,7 +119,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
         return newValue
@@ -161,45 +176,17 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   /** Increase table size by 1, rehashing if necessary */
   private def incrementSize() {
     curSize += 1
-    if (curSize > LOAD_FACTOR * capacity) {
+    if (curSize > growThreshold) {
       growTable()
     }
   }
 
   /**
-   * Re-hash a value to deal better with hash functions that don't differ
-   * in the lower bits, similar to java.util.HashMap
+   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
   private def rehash(h: Int): Int = {
-    val r = h ^ (h >>> 20) ^ (h >>> 12)
-    r ^ (r >>> 7) ^ (r >>> 4)
-  }
-
-  /**
-   * Put an entry into a table represented by data, returning true if
-   * this increases the size of the table or false otherwise. Assumes
-   * that "data" has at least one empty slot.
-   */
-  private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = {
-    val mask = (data.length / 2) - 1
-    var pos = rehash(key.hashCode) & mask
-    var i = 1
-    while (true) {
-      val curKey = data(2 * pos)
-      if (curKey.eq(null)) {
-        data(2 * pos) = key
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return true
-      } else if (curKey.eq(key) || curKey == key) {
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return false
-      } else {
-        val delta = i
-        pos = (pos + delta) & mask
-        i += 1
-      }
-    }
-    return false  // Never reached but needed to keep compiler happy
+    it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
   }
 
   /** Double the table's size and re-hash everything */
@@ -211,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       throw new Exception("Can't make capacity bigger than 2^29 elements")
     }
     val newData = new Array[AnyRef](2 * newCapacity)
-    var pos = 0
-    while (pos < capacity) {
-      if (!data(2 * pos).eq(null)) {
-        putInto(newData, data(2 * pos), data(2 * pos + 1))
+    val newMask = newCapacity - 1
+    // Insert all our old values into the new array. Note that because our old keys are
+    // unique, there's no need to check for equality here when we insert.
+    var oldPos = 0
+    while (oldPos < capacity) {
+      if (!data(2 * oldPos).eq(null)) {
+        val key = data(2 * oldPos)
+        val value = data(2 * oldPos + 1)
+        var newPos = rehash(key.hashCode) & newMask
+        var i = 1
+        var keepGoing = true
+        while (keepGoing) {
+          val curKey = newData(2 * newPos)
+          if (curKey.eq(null)) {
+            newData(2 * newPos) = key
+            newData(2 * newPos + 1) = value
+            keepGoing = false
+          } else {
+            val delta = i
+            newPos = (newPos + delta) & newMask
+            i += 1
+          }
+        }
       }
-      pos += 1
+      oldPos += 1
     }
     data = newData
     capacity = newCapacity
-    mask = newCapacity - 1
+    mask = newMask
+    growThreshold = LOAD_FACTOR * newCapacity
   }
 
   private def nextPowerOf2(n: Int): Int = {