You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/24 04:02:29 UTC

[1/3] git commit: Fixes to AppendOnlyMap:

Updated Branches:
  refs/heads/master 51aa9d6e9 -> 718cc803f


Fixes to AppendOnlyMap:

- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
  instead of the simpler version in java.util.HashMap; the latter one
  had trouble with ranges of consecutive integers. Murmur Hash 3 is used
  by fastutil.
- Use Object.equals() instead of Scala's == to compare keys, because the
  latter does extra casts for numeric types (see the equals method in
  https://github.com/scala/scala/blob/master/src/library/scala/runtime/BoxesRunTime.java)


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/7535d7fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/7535d7fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/7535d7fb

Branch: refs/heads/master
Commit: 7535d7fbcbe3c0c2515a2d17a806fa523917e398
Parents: 51aa9d6
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sat Nov 23 17:21:37 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sat Nov 23 17:21:37 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/AppendOnlyMap.scala    | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/7535d7fb/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index f60deaf..8542541 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -56,7 +56,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         return data(2 * pos + 1).asInstanceOf[V]
       } else if (curKey.eq(null)) {
         return null.asInstanceOf[V]
@@ -104,7 +104,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     var i = 1
     while (true) {
       val curKey = data(2 * pos)
-      if (k.eq(curKey) || k == curKey) {
+      if (k.eq(curKey) || k.equals(curKey)) {
         val newValue = updateFunc(true, data(2 * pos + 1).asInstanceOf[V])
         data(2 * pos + 1) = newValue.asInstanceOf[AnyRef]
         return newValue
@@ -167,12 +167,11 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   }
 
   /**
-   * Re-hash a value to deal better with hash functions that don't differ
-   * in the lower bits, similar to java.util.HashMap
+   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
   private def rehash(h: Int): Int = {
-    val r = h ^ (h >>> 20) ^ (h >>> 12)
-    r ^ (r >>> 7) ^ (r >>> 4)
+    it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
   }
 
   /**
@@ -190,7 +189,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
         data(2 * pos) = key
         data(2 * pos + 1) = value.asInstanceOf[AnyRef]
         return true
-      } else if (curKey.eq(key) || curKey == key) {
+      } else if (curKey.eq(key) || curKey.equals(key)) {
         data(2 * pos + 1) = value.asInstanceOf[AnyRef]
         return false
       } else {


[2/3] git commit: Some other optimizations to AppendOnlyMap:

Posted by rx...@apache.org.
Some other optimizations to AppendOnlyMap:

- Don't check keys for equality when re-inserting due to growing the
  table; the keys will already be unique
- Remember the grow threshold instead of recomputing it on each insert


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9837a602
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9837a602
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9837a602

Branch: refs/heads/master
Commit: 9837a60234964333916ccbf02c8610909462a7ad
Parents: 7535d7f
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sat Nov 23 17:38:29 2013 -0800
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sat Nov 23 17:38:29 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/AppendOnlyMap.scala   | 82 +++++++++++---------
 1 file changed, 45 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9837a602/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
index 8542541..8bb4ee3 100644
--- a/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/AppendOnlyMap.scala
@@ -35,6 +35,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   private var capacity = nextPowerOf2(initialCapacity)
   private var mask = capacity - 1
   private var curSize = 0
+  private var growThreshold = LOAD_FACTOR * capacity
 
   // Holds keys and values in the same array for memory locality; specifically, the order of
   // elements is key0, value0, key1, value1, key2, value2, etc.
@@ -80,9 +81,23 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       haveNullValue = true
       return
     }
-    val isNewEntry = putInto(data, k, value.asInstanceOf[AnyRef])
-    if (isNewEntry) {
-      incrementSize()
+    var pos = rehash(key.hashCode) & mask
+    var i = 1
+    while (true) {
+      val curKey = data(2 * pos)
+      if (curKey.eq(null)) {
+        data(2 * pos) = k
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        incrementSize()  // Since we added a new key
+        return
+      } else if (k.eq(curKey) || k.equals(curKey)) {
+        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
+        return
+      } else {
+        val delta = i
+        pos = (pos + delta) & mask
+        i += 1
+      }
     }
   }
 
@@ -161,7 +176,7 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
   /** Increase table size by 1, rehashing if necessary */
   private def incrementSize() {
     curSize += 1
-    if (curSize > LOAD_FACTOR * capacity) {
+    if (curSize > growThreshold) {
       growTable()
     }
   }
@@ -174,33 +189,6 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
     it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
   }
 
-  /**
-   * Put an entry into a table represented by data, returning true if
-   * this increases the size of the table or false otherwise. Assumes
-   * that "data" has at least one empty slot.
-   */
-  private def putInto(data: Array[AnyRef], key: AnyRef, value: AnyRef): Boolean = {
-    val mask = (data.length / 2) - 1
-    var pos = rehash(key.hashCode) & mask
-    var i = 1
-    while (true) {
-      val curKey = data(2 * pos)
-      if (curKey.eq(null)) {
-        data(2 * pos) = key
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return true
-      } else if (curKey.eq(key) || curKey.equals(key)) {
-        data(2 * pos + 1) = value.asInstanceOf[AnyRef]
-        return false
-      } else {
-        val delta = i
-        pos = (pos + delta) & mask
-        i += 1
-      }
-    }
-    return false  // Never reached but needed to keep compiler happy
-  }
-
   /** Double the table's size and re-hash everything */
   private def growTable() {
     val newCapacity = capacity * 2
@@ -210,16 +198,36 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) extends Iterable[(K, V)] wi
       throw new Exception("Can't make capacity bigger than 2^29 elements")
     }
     val newData = new Array[AnyRef](2 * newCapacity)
-    var pos = 0
-    while (pos < capacity) {
-      if (!data(2 * pos).eq(null)) {
-        putInto(newData, data(2 * pos), data(2 * pos + 1))
+    val newMask = newCapacity - 1
+    // Insert all our old values into the new array. Note that because our old keys are
+    // unique, there's no need to check for equality here when we insert.
+    var oldPos = 0
+    while (oldPos < capacity) {
+      if (!data(2 * oldPos).eq(null)) {
+        val key = data(2 * oldPos)
+        val value = data(2 * oldPos + 1)
+        var newPos = rehash(key.hashCode) & newMask
+        var i = 1
+        var keepGoing = true
+        while (keepGoing) {
+          val curKey = newData(2 * newPos)
+          if (curKey.eq(null)) {
+            newData(2 * newPos) = key
+            newData(2 * newPos + 1) = value
+            keepGoing = false
+          } else {
+            val delta = i
+            newPos = (newPos + delta) & newMask
+            i += 1
+          }
+        }
       }
-      pos += 1
+      oldPos += 1
     }
     data = newData
     capacity = newCapacity
-    mask = newCapacity - 1
+    mask = newMask
+    growThreshold = LOAD_FACTOR * newCapacity
   }
 
   private def nextPowerOf2(n: Int): Int = {


[3/3] git commit: Merge pull request #200 from mateiz/hash-fix

Posted by rx...@apache.org.
Merge pull request #200 from mateiz/hash-fix

AppendOnlyMap fixes

- Chose a more random reshuffling step for values returned by Object.hashCode to avoid some long chaining that was happening for consecutive integers (e.g. `sc.makeRDD(1 to 100000000, 100).map(t => (t, t)).reduceByKey(_ + _).count`)
- Some other small optimizations throughout (see commit comments)


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/718cc803
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/718cc803
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/718cc803

Branch: refs/heads/master
Commit: 718cc803f7e0600c9ab265022eb6027926a38010
Parents: 51aa9d6 9837a60
Author: Reynold Xin <rx...@apache.org>
Authored: Sun Nov 24 11:02:02 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Sun Nov 24 11:02:02 2013 +0800

----------------------------------------------------------------------
 .../org/apache/spark/util/AppendOnlyMap.scala   | 93 +++++++++++---------
 1 file changed, 50 insertions(+), 43 deletions(-)
----------------------------------------------------------------------