You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/11/26 05:49:00 UTC

[2/3] git commit: Incorporated ideas from pull request #200. - Use Murmur Hash 3 finalization step to scramble the bits of HashCode instead of the simpler version in java.util.HashMap; the latter one had trouble with ranges of consecutive integers. M

Incorporated ideas from pull request #200.
- Use Murmur Hash 3 finalization step to scramble the bits of HashCode
  instead of the simpler version in java.util.HashMap; the latter one
  had trouble with ranges of consecutive integers. Murmur Hash 3 is used
  by fastutil.

- Don't check keys for equality when re-inserting due to growing the
  table; the keys will already be unique

- Remember the grow threshold instead of recomputing it on each insert


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/466fd064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/466fd064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/466fd064

Branch: refs/heads/master
Commit: 466fd06475d8ed262c456421ed2dceba54229db1
Parents: 95c55df
Author: Reynold Xin <rx...@apache.org>
Authored: Mon Nov 25 18:27:26 2013 +0800
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Nov 25 18:27:26 2013 +0800

----------------------------------------------------------------------
 .../spark/util/collection/OpenHashSet.scala     | 107 ++++++++++---------
 1 file changed, 57 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/466fd064/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
index 4592e4f..40986e3 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -79,6 +79,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
   protected var _capacity = nextPowerOf2(initialCapacity)
   protected var _mask = _capacity - 1
   protected var _size = 0
+  protected var _growThreshold = (loadFactor * _capacity).toInt
 
   protected var _bitset = new BitSet(_capacity)
 
@@ -115,7 +116,29 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
    * @return The position where the key is placed, plus the highest order bit is set if the key
    *         exists previously.
    */
-  def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
+  def addWithoutResize(k: T): Int = {
+    var pos = hashcode(hasher.hash(k)) & _mask
+    var i = 1
+    while (true) {
+      if (!_bitset.get(pos)) {
+        // This is a new key.
+        _data(pos) = k
+        _bitset.set(pos)
+        _size += 1
+        return pos | NONEXISTENCE_MASK
+      } else if (_data(pos) == k) {
+        // Found an existing key.
+        return pos
+      } else {
+        val delta = i
+        pos = (pos + delta) & _mask
+        i += 1
+      }
+    }
+    // Never reached here
+    assert(INVALID_POS != INVALID_POS)
+    INVALID_POS
+  }
 
   /**
    * Rehash the set if it is overloaded.
@@ -126,7 +149,7 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
    *                 to a new position (in the new data array).
    */
   def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
-    if (_size > loadFactor * _capacity) {
+    if (_size > _growThreshold) {
       rehash(k, allocateFunc, moveFunc)
     }
   }
@@ -161,37 +184,6 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
   def nextPos(fromPos: Int): Int = _bitset.nextSetBit(fromPos)
 
   /**
-   * Put an entry into the set. Return the position where the key is placed. In addition, the
-   * highest bit in the returned position is set if the key exists prior to this put.
-   *
-   * This function assumes the data array has at least one empty slot.
-   */
-  private def putInto(bitset: BitSet, data: Array[T], k: T): Int = {
-    val mask = data.length - 1
-    var pos = hashcode(hasher.hash(k)) & mask
-    var i = 1
-    while (true) {
-      if (!bitset.get(pos)) {
-        // This is a new key.
-        data(pos) = k
-        bitset.set(pos)
-        _size += 1
-        return pos | NONEXISTENCE_MASK
-      } else if (data(pos) == k) {
-        // Found an existing key.
-        return pos
-      } else {
-        val delta = i
-        pos = (pos + delta) & mask
-        i += 1
-      }
-    }
-    // Never reached here
-    assert(INVALID_POS != INVALID_POS)
-    INVALID_POS
-  }
-
-  /**
    * Double the table's size and re-hash everything. We are not really using k, but it is declared
    * so Scala compiler can specialize this method (which leads to calling the specialized version
    * of putInto).
@@ -204,34 +196,49 @@ class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
    */
   private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
     val newCapacity = _capacity * 2
-    require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
-
     allocateFunc(newCapacity)
-    val newData = new Array[T](newCapacity)
     val newBitset = new BitSet(newCapacity)
-    var pos = 0
-    _size = 0
-    while (pos < _capacity) {
-      if (_bitset.get(pos)) {
-        val newPos = putInto(newBitset, newData, _data(pos))
-        moveFunc(pos, newPos & POSITION_MASK)
+    val newData = new Array[T](newCapacity)
+    val newMask = newCapacity - 1
+
+    var oldPos = 0
+    while (oldPos < capacity) {
+      if (_bitset.get(oldPos)) {
+        val key = _data(oldPos)
+        var newPos = hashcode(hasher.hash(key)) & newMask
+        var i = 1
+        var keepGoing = true
+        // No need to check for equality here when we insert so this has one less if branch than
+        // the similar code path in addWithoutResize.
+        while (keepGoing) {
+          if (!newBitset.get(newPos)) {
+            // Inserting the key at newPos
+            newData(newPos) = key
+            newBitset.set(newPos)
+            moveFunc(oldPos, newPos)
+            keepGoing = false
+          } else {
+            val delta = i
+            newPos = (newPos + delta) & newMask
+            i += 1
+          }
+        }
       }
-      pos += 1
+      oldPos += 1
     }
+
     _bitset = newBitset
     _data = newData
     _capacity = newCapacity
-    _mask = newCapacity - 1
+    _mask = newMask
+    _growThreshold = (loadFactor * newCapacity).toInt
   }
 
   /**
-   * Re-hash a value to deal better with hash functions that don't differ
-   * in the lower bits, similar to java.util.HashMap
+   * Re-hash a value to deal better with hash functions that don't differ in the lower bits.
+   * We use the Murmur Hash 3 finalization step that's also used in fastutil.
    */
-  private def hashcode(h: Int): Int = {
-    val r = h ^ (h >>> 20) ^ (h >>> 12)
-    r ^ (r >>> 7) ^ (r >>> 4)
-  }
+  private def hashcode(h: Int): Int = it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
 
   private def nextPowerOf2(n: Int): Int = {
     val highBit = Integer.highestOneBit(n)