You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2013/11/04 05:43:36 UTC

[3/5] git commit: Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug. Also addressed Matei's code review comment.

Fixed a bug that uses twice amount of memory for the primitive arrays due to a scala compiler bug.
Also addressed Matei's code review comment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1e9543b5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1e9543b5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1e9543b5

Branch: refs/heads/master
Commit: 1e9543b567b81cf3207984402269d230c10e713e
Parents: da6bb0a
Author: Reynold Xin <rx...@apache.org>
Authored: Sat Nov 2 23:19:01 2013 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Sat Nov 2 23:19:01 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     |  12 +-
 .../apache/spark/util/collection/BitSet.scala   | 103 +++++++
 .../spark/util/collection/OpenHashMap.scala     | 152 +++++++++++
 .../spark/util/collection/OpenHashSet.scala     | 267 +++++++++++++++++++
 .../collection/PrimitiveKeyOpenHashMap.scala    | 121 +++++++++
 .../org/apache/spark/util/hash/BitSet.scala     | 103 -------
 .../apache/spark/util/hash/OpenHashMap.scala    | 148 ----------
 .../apache/spark/util/hash/OpenHashSet.scala    | 265 ------------------
 .../util/hash/PrimitiveKeyOpenHashMap.scala     | 117 --------
 .../spark/util/collection/BitSetSuite.scala     |  73 +++++
 .../util/collection/OpenHashMapSuite.scala      | 148 ++++++++++
 .../util/collection/OpenHashSetSuite.scala      |  74 +++++
 .../PrimitiveKeyOpenHashSetSuite.scala          |  90 +++++++
 .../apache/spark/util/hash/BitSetSuite.scala    |  73 -----
 .../spark/util/hash/OpenHashMapSuite.scala      | 148 ----------
 .../spark/util/hash/OpenHashSetSuite.scala      |  74 -----
 .../hash/PrimitiveKeyOpenHashSetSuite.scala     |  90 -------
 17 files changed, 1033 insertions(+), 1025 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 0c5c12b..fe932d8 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -18,13 +18,12 @@
 package org.apache.spark.util
 
 import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
+import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address}
 import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
+import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor}
 
 import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.mutable.ArrayBuffer
 import scala.collection.JavaConversions._
 import scala.io.Source
 
@@ -36,8 +35,7 @@ import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
 import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
 import org.apache.spark.deploy.SparkHadoopUtil
 import java.nio.ByteBuffer
-import org.apache.spark.{SparkEnv, SparkException, Logging}
-import java.util.ConcurrentModificationException
+import org.apache.spark.{SparkException, Logging}
 
 
 /**
@@ -149,7 +147,7 @@ private[spark] object Utils extends Logging {
     return buf
   }
 
-  private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
+  private val shutdownDeletePaths = new scala.collection.mutable.HashSet[String]()
 
   // Register the path to be deleted via shutdown hook
   def registerShutdownDeleteDir(file: File) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
new file mode 100644
index 0000000..6604ec7
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+
+/**
+ * A simple, fixed-size bit set implementation. This implementation is fast because it avoids
+ * safety/bound checking.
+ */
+class BitSet(numBits: Int) {
+
+  private[this] val words = new Array[Long](bit2words(numBits))
+  private[this] val numWords = words.length
+
+  /**
+   * Sets the bit at the specified index to true.
+   * @param index the bit index
+   */
+  def set(index: Int) {
+    val bitmask = 1L << (index & 0x3f)  // mod 64 and shift
+    words(index >> 6) |= bitmask        // div by 64 and mask
+  }
+
+  /**
+   * Return the value of the bit with the specified index. The value is true if the bit with
+   * the index is currently set in this BitSet; otherwise, the result is false.
+   *
+   * @param index the bit index
+   * @return the value of the bit with the specified index
+   */
+  def get(index: Int): Boolean = {
+    val bitmask = 1L << (index & 0x3f)   // mod 64 and shift
+    (words(index >>> 6) & bitmask) != 0  // div by 64 and mask
+  }
+
+  /** Return the number of bits set to true in this BitSet. */
+  def cardinality(): Int = {
+    var sum = 0
+    var i = 0
+    while (i < numWords) {
+      sum += java.lang.Long.bitCount(words(i))
+      i += 1
+    }
+    sum
+  }
+
+  /**
+   * Returns the index of the first bit that is set to true that occurs on or after the
+   * specified starting index. If no such bit exists then -1 is returned.
+   *
+   * To iterate over the true bits in a BitSet, use the following loop:
+   *
+   *  for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
+   *    // operate on index i here
+   *  }
+   *
+   * @param fromIndex the index to start checking from (inclusive)
+   * @return the index of the next set bit, or -1 if there is no such bit
+   */
+  def nextSetBit(fromIndex: Int): Int = {
+    var wordIndex = fromIndex >> 6
+    if (wordIndex >= numWords) {
+      return -1
+    }
+
+    // Try to find the next set bit in the current word
+    val subIndex = fromIndex & 0x3f
+    var word = words(wordIndex) >> subIndex
+    if (word != 0) {
+      return (wordIndex << 6) + subIndex + java.lang.Long.numberOfTrailingZeros(word)
+    }
+
+    // Find the next set bit in the rest of the words
+    wordIndex += 1
+    while (wordIndex < numWords) {
+      word = words(wordIndex)
+      if (word != 0) {
+        return (wordIndex << 6) + java.lang.Long.numberOfTrailingZeros(word)
+      }
+      wordIndex += 1
+    }
+
+    -1
+  }
+
+  /** Return the number of longs it would take to hold numBits. */
+  private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
new file mode 100644
index 0000000..ed117b2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+
+/**
+ * A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
+ * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
+ * space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+private[spark]
+class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
+    initialCapacity: Int)
+  extends Iterable[(K, V)]
+  with Serializable {
+
+  def this() = this(64)
+
+  protected var _keySet = new OpenHashSet[K](initialCapacity)
+
+  // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+  // bug that would generate two arrays (one for Object and one for specialized T).
+  private var _values: Array[V] = _
+  _values = new Array[V](_keySet.capacity)
+
+  @transient private var _oldValues: Array[V] = null
+
+  // Treat the null key differently so we can use nulls in "data" to represent empty items.
+  private var haveNullValue = false
+  private var nullValue: V = null.asInstanceOf[V]
+
+  override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
+
+  /** Get the value for a given key */
+  def apply(k: K): V = {
+    if (k == null) {
+      nullValue
+    } else {
+      val pos = _keySet.getPos(k)
+      if (pos < 0) {
+        null.asInstanceOf[V]
+      } else {
+        _values(pos)
+      }
+    }
+  }
+
+  /** Set the value for a key */
+  def update(k: K, v: V) {
+    if (k == null) {
+      haveNullValue = true
+      nullValue = v
+    } else {
+      val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+      _values(pos) = v
+      _keySet.rehashIfNeeded(k, grow, move)
+      _oldValues = null
+    }
+  }
+
+  /**
+   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+   * set its value to mergeValue(oldValue).
+   *
+   * @return the newly updated value.
+   */
+  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+    if (k == null) {
+      if (haveNullValue) {
+        nullValue = mergeValue(nullValue)
+      } else {
+        haveNullValue = true
+        nullValue = defaultValue
+      }
+      nullValue
+    } else {
+      val pos = _keySet.addWithoutResize(k)
+      if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
+        val newValue = defaultValue
+        _values(pos & OpenHashSet.POSITION_MASK) = newValue
+        _keySet.rehashIfNeeded(k, grow, move)
+        newValue
+      } else {
+        _values(pos) = mergeValue(_values(pos))
+        _values(pos)
+      }
+    }
+  }
+
+  override def iterator = new Iterator[(K, V)] {
+    var pos = -1
+    var nextPair: (K, V) = computeNextPair()
+
+    /** Get the next value we should return from next(), or null if we're finished iterating */
+    def computeNextPair(): (K, V) = {
+      if (pos == -1) {    // Treat position -1 as looking at the null value
+        if (haveNullValue) {
+          pos += 1
+          return (null.asInstanceOf[K], nullValue)
+        }
+        pos += 1
+      }
+      pos = _keySet.nextPos(pos)
+      if (pos >= 0) {
+        val ret = (_keySet.getValue(pos), _values(pos))
+        pos += 1
+        ret
+      } else {
+        null
+      }
+    }
+
+    def hasNext = nextPair != null
+
+    def next() = {
+      val pair = nextPair
+      nextPair = computeNextPair()
+      pair
+    }
+  }
+
+  // The following member variables are declared as protected instead of private for the
+  // specialization to work (specialized class extends the non-specialized one and needs access
+  // to the "private" variables).
+  // They also should have been val's. We use var's because there is a Scala compiler bug that
+  // would throw illegal access error at runtime if they are declared as val's.
+  protected var grow = (newCapacity: Int) => {
+    _oldValues = _values
+    _values = new Array[V](newCapacity)
+  }
+
+  protected var move = (oldPos: Int, newPos: Int) => {
+    _values(newPos) = _oldValues(oldPos)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
new file mode 100644
index 0000000..e98a93d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+
+/**
+ * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
+ * removed.
+ *
+ * The underlying implementation uses Scala compiler's specialization to generate optimized
+ * storage for two primitive types (Long and Int). It is much faster than Java's standard HashSet
+ * while incurring much less memory overhead. This can serve as building blocks for higher level
+ * data structures such as an optimized HashMap.
+ *
+ * This OpenHashSet is designed to serve as building blocks for higher level data structures
+ * such as an optimized hash map. Compared with standard hash set implementations, this class
+ * provides its various callbacks interfaces (e.g. allocateFunc, moveFunc) and interfaces to
+ * retrieve the position of a key in the underlying array.
+ *
+ * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed
+ * to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
+ */
+private[spark]
+class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
+    initialCapacity: Int,
+    loadFactor: Double)
+  extends Serializable {
+
+  require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+  require(initialCapacity >= 1, "Invalid initial capacity")
+
+  import OpenHashSet._
+
+  def this(initialCapacity: Int) = this(initialCapacity, 0.7)
+
+  def this() = this(64)
+
+  // The following member variables are declared as protected instead of private for the
+  // specialization to work (specialized class extends the non-specialized one and needs access
+  // to the "private" variables).
+
+  protected val hasher: Hasher[T] = {
+    // It would've been more natural to write the following using pattern matching. But Scala 2.9.x
+    // compiler has a bug when specialization is used together with this pattern matching, and
+    // throws:
+    // scala.tools.nsc.symtab.Types$TypeError: type mismatch;
+    //  found   : scala.reflect.AnyValManifest[Long]
+    //  required: scala.reflect.ClassManifest[Int]
+    //         at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
+    //         at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
+    //         ...
+    val mt = classManifest[T]
+    if (mt == ClassManifest.Long) {
+      (new LongHasher).asInstanceOf[Hasher[T]]
+    } else if (mt == ClassManifest.Int) {
+      (new IntHasher).asInstanceOf[Hasher[T]]
+    } else {
+      new Hasher[T]
+    }
+  }
+
+  protected var _capacity = nextPowerOf2(initialCapacity)
+  protected var _mask = _capacity - 1
+  protected var _size = 0
+
+  protected var _bitset = new BitSet(_capacity)
+
+  // Init of the array in constructor (instead of in declaration) to work around a Scala compiler
+  // specialization bug that would generate two arrays (one for Object and one for specialized T).
+  protected var _data: Array[T] = _
+  _data = new Array[T](_capacity)
+
+  /** Number of elements in the set. */
+  def size: Int = _size
+
+  /** The capacity of the set (i.e. size of the underlying array). */
+  def capacity: Int = _capacity
+
+  /** Return true if this set contains the specified element. */
+  def contains(k: T): Boolean = getPos(k) != INVALID_POS
+
+  /**
+   * Add an element to the set. If the set is over capacity after the insertion, grow the set
+   * and rehash all elements.
+   */
+  def add(k: T) {
+    addWithoutResize(k)
+    rehashIfNeeded(k, grow, move)
+  }
+
+  /**
+   * Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
+   * The caller is responsible for calling rehashIfNeeded.
+   *
+   * Use (retval & POSITION_MASK) to get the actual position, and
+   * (retval & EXISTENCE_MASK) != 0 for prior existence.
+   *
+   * @return The position where the key is placed, plus the highest order bit is set if the key
+   *         exists previously.
+   */
+  def addWithoutResize(k: T): Int = putInto(_bitset, _data, k)
+
+  /**
+   * Rehash the set if it is overloaded.
+   * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+   *          this method.
+   * @param allocateFunc Closure invoked when we are allocating a new, larger array.
+   * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
+   *                 to a new position (in the new data array).
+   */
+  def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+    if (_size > loadFactor * _capacity) {
+      rehash(k, allocateFunc, moveFunc)
+    }
+  }
+
+  /** Return the position of the element in the underlying array. */
+  def getPos(k: T): Int = {
+    var pos = hashcode(hasher.hash(k)) & _mask
+    var i = 1
+    while (true) {
+      if (!_bitset.get(pos)) {
+        return INVALID_POS
+      } else if (k == _data(pos)) {
+        return pos
+      } else {
+        val delta = i
+        pos = (pos + delta) & _mask
+        i += 1
+      }
+    }
+    // Never reached here
+    INVALID_POS
+  }
+
+  /** Return the value at the specified position. */
+  def getValue(pos: Int): T = _data(pos)
+
+  /**
+   * Return the next position with an element stored, starting from the given position inclusively.
+   */
+  def nextPos(fromPos: Int): Int = _bitset.nextSetBit(fromPos)
+
+  /**
+   * Put an entry into the set. Return the position where the key is placed. In addition, the
+   * highest bit in the returned position is set if the key exists prior to this put.
+   *
+   * This function assumes the data array has at least one empty slot.
+   */
+  private def putInto(bitset: BitSet, data: Array[T], k: T): Int = {
+    val mask = data.length - 1
+    var pos = hashcode(hasher.hash(k)) & mask
+    var i = 1
+    while (true) {
+      if (!bitset.get(pos)) {
+        // This is a new key.
+        data(pos) = k
+        bitset.set(pos)
+        _size += 1
+        return pos | EXISTENCE_MASK
+      } else if (data(pos) == k) {
+        // Found an existing key.
+        return pos
+      } else {
+        val delta = i
+        pos = (pos + delta) & mask
+        i += 1
+      }
+    }
+    // Never reached here
+    assert(INVALID_POS != INVALID_POS)
+    INVALID_POS
+  }
+
+  /**
+   * Double the table's size and re-hash everything. We are not really using k, but it is declared
+   * so Scala compiler can specialize this method (which leads to calling the specialized version
+   * of putInto).
+   *
+   * @param k A parameter unused in the function, but to force the Scala compiler to specialize
+   *          this method.
+   * @param allocateFunc Closure invoked when we are allocating a new, larger array.
+   * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
+   *                 to a new position (in the new data array).
+   */
+  private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
+    val newCapacity = _capacity * 2
+    require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
+
+    allocateFunc(newCapacity)
+    val newData = classManifest[T].newArray(newCapacity)
+    val newBitset = new BitSet(newCapacity)
+    var pos = 0
+    _size = 0
+    while (pos < _capacity) {
+      if (_bitset.get(pos)) {
+        val newPos = putInto(newBitset, newData, _data(pos))
+        moveFunc(pos, newPos & POSITION_MASK)
+      }
+      pos += 1
+    }
+    _bitset = newBitset
+    _data = newData
+    _capacity = newCapacity
+    _mask = newCapacity - 1
+  }
+
+  /**
+   * Re-hash a value to deal better with hash functions that don't differ
+   * in the lower bits, similar to java.util.HashMap
+   */
+  private def hashcode(h: Int): Int = {
+    val r = h ^ (h >>> 20) ^ (h >>> 12)
+    r ^ (r >>> 7) ^ (r >>> 4)
+  }
+
+  private def nextPowerOf2(n: Int): Int = {
+    val highBit = Integer.highestOneBit(n)
+    if (highBit == n) n else highBit << 1
+  }
+}
+
+
+private[spark]
+object OpenHashSet {
+
+  val INVALID_POS = -1
+  val EXISTENCE_MASK = 0x80000000
+  val POSITION_MASK = 0xEFFFFFF
+
+  /**
+   * A set of specialized hash function implementation to avoid boxing hash code computation
+   * in the specialized implementation of OpenHashSet.
+   */
+  sealed class Hasher[@specialized(Long, Int) T] {
+    def hash(o: T): Int = o.hashCode()
+  }
+
+  class LongHasher extends Hasher[Long] {
+    override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
+  }
+
+  class IntHasher extends Hasher[Int] {
+    override def hash(o: Int): Int = o
+  }
+
+  private def grow1(newSize: Int) {}
+  private def move1(oldPos: Int, newPos: Int) { }
+
+  private val grow = grow1 _
+  private val move = move1 _
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
new file mode 100644
index 0000000..e8f28ec
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+
+/**
+ * A fast hash map implementation for primitive, non-null keys. This hash map supports
+ * insertions and updates, but not deletions. This map is about an order of magnitude
+ * faster than java.util.HashMap, while using much less space overhead.
+ *
+ * Under the hood, it uses our OpenHashSet implementation.
+ */
+private[spark]
+class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
+                              @specialized(Long, Int, Double) V: ClassManifest](
+    initialCapacity: Int)
+  extends Iterable[(K, V)]
+  with Serializable {
+
+  def this() = this(64)
+
+  require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
+
+  // Init in constructor (instead of in declaration) to work around a Scala compiler specialization
+  // bug that would generate two arrays (one for Object and one for specialized T).
+  protected var _keySet: OpenHashSet[K] = _
+  private var _values: Array[V] = _
+  _keySet = new OpenHashSet[K](initialCapacity)
+  _values = new Array[V](_keySet.capacity)
+
+  private var _oldValues: Array[V] = null
+
+  override def size = _keySet.size
+
+  /** Get the value for a given key */
+  def apply(k: K): V = {
+    val pos = _keySet.getPos(k)
+    _values(pos)
+  }
+
+  /** Set the value for a key */
+  def update(k: K, v: V) {
+    val pos = _keySet.addWithoutResize(k) & OpenHashSet.POSITION_MASK
+    _values(pos) = v
+    _keySet.rehashIfNeeded(k, grow, move)
+    _oldValues = null
+  }
+
+  /**
+   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
+   * set its value to mergeValue(oldValue).
+   *
+   * @return the newly updated value.
+   */
+  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
+    val pos = _keySet.addWithoutResize(k)
+    if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
+      val newValue = defaultValue
+      _values(pos & OpenHashSet.POSITION_MASK) = newValue
+      _keySet.rehashIfNeeded(k, grow, move)
+      newValue
+    } else {
+      _values(pos) = mergeValue(_values(pos))
+      _values(pos)
+    }
+  }
+
+  override def iterator = new Iterator[(K, V)] {
+    var pos = 0
+    var nextPair: (K, V) = computeNextPair()
+
+    /** Get the next value we should return from next(), or null if we're finished iterating */
+    def computeNextPair(): (K, V) = {
+      pos = _keySet.nextPos(pos)
+      if (pos >= 0) {
+        val ret = (_keySet.getValue(pos), _values(pos))
+        pos += 1
+        ret
+      } else {
+        null
+      }
+    }
+
+    def hasNext = nextPair != null
+
+    def next() = {
+      val pair = nextPair
+      nextPair = computeNextPair()
+      pair
+    }
+  }
+
+  // The following member variables are declared as protected instead of private for the
+  // specialization to work (specialized class extends the unspecialized one and needs access
+  // to the "private" variables).
+  // They also should have been val's. We use var's because there is a Scala compiler bug that
+  // would throw illegal access error at runtime if they are declared as val's.
+  protected var grow = (newCapacity: Int) => {
+    _oldValues = _values
+    _values = new Array[V](newCapacity)
+  }
+
+  protected var move = (oldPos: Int, newPos: Int) => {
+    _values(newPos) = _oldValues(oldPos)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala b/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala
deleted file mode 100644
index 0ec002b..0000000
--- a/core/src/main/scala/org/apache/spark/util/hash/BitSet.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.hash
-
-
-/**
- * A simple, fixed-size bit set implementation. This implementation is fast because it avoids
- * safety/bound checking.
- */
-class BitSet(numBits: Int) {
-
-  private val words = new Array[Long](bit2words(numBits))
-  private val numWords = words.length
-
-  /**
-   * Sets the bit at the specified index to true.
-   * @param index the bit index
-   */
-  def set(index: Int) {
-    val bitmask = 1L << (index & 0x3f)  // mod 64 and shift
-    words(index >> 6) |= bitmask        // div by 64 and mask
-  }
-
-  /**
-   * Return the value of the bit with the specified index. The value is true if the bit with
-   * the index is currently set in this BitSet; otherwise, the result is false.
-   *
-   * @param index the bit index
-   * @return the value of the bit with the specified index
-   */
-  def get(index: Int): Boolean = {
-    val bitmask = 1L << (index & 0x3f)   // mod 64 and shift
-    (words(index >>> 6) & bitmask) != 0  // div by 64 and mask
-  }
-
-  /** Return the number of bits set to true in this BitSet. */
-  def cardinality(): Int = {
-    var sum = 0
-    var i = 0
-    while (i < numWords) {
-      sum += java.lang.Long.bitCount(words(i))
-      i += 1
-    }
-    sum
-  }
-
-  /**
-   * Returns the index of the first bit that is set to true that occurs on or after the
-   * specified starting index. If no such bit exists then -1 is returned.
-   *
-   * To iterate over the true bits in a BitSet, use the following loop:
-   *
-   *  for (int i = bs.nextSetBit(0); i >= 0; i = bs.nextSetBit(i+1)) {
-   *    // operate on index i here
-   *  }
-   *
-   * @param fromIndex the index to start checking from (inclusive)
-   * @return the index of the next set bit, or -1 if there is no such bit
-   */
-  def nextSetBit(fromIndex: Int): Int = {
-    var wordIndex = fromIndex >> 6
-    if (wordIndex >= numWords) {
-      return -1
-    }
-
-    // Try to find the next set bit in the current word
-    val subIndex = fromIndex & 0x3f
-    var word = words(wordIndex) >> subIndex
-    if (word != 0) {
-      return (wordIndex << 6) + subIndex + java.lang.Long.numberOfTrailingZeros(word)
-    }
-
-    // Find the next set bit in the rest of the words
-    wordIndex += 1
-    while (wordIndex < numWords) {
-      word = words(wordIndex)
-      if (word != 0) {
-        return (wordIndex << 6) + java.lang.Long.numberOfTrailingZeros(word)
-      }
-      wordIndex += 1
-    }
-
-    -1
-  }
-
-  /** Return the number of longs it would take to hold numBits. */
-  private def bit2words(numBits: Int) = ((numBits - 1) >>> 6) + 1
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
deleted file mode 100644
index a376d10..0000000
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashMap.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.hash
-
-
-/**
- * A fast hash map implementation for nullable keys. This hash map supports insertions and updates,
- * but not deletions. This map is about 5X faster than java.util.HashMap, while using much less
- * space overhead.
- *
- * Under the hood, it uses our OpenHashSet implementation.
- */
-private[spark]
-class OpenHashMap[K >: Null : ClassManifest, @specialized(Long, Int, Double) V: ClassManifest](
-    initialCapacity: Int)
-  extends Iterable[(K, V)]
-  with Serializable {
-
-  def this() = this(64)
-
-  protected var _keySet = new OpenHashSet[K](initialCapacity)
-  private var _values = new Array[V](_keySet.capacity)
-
-  @transient private var _oldValues: Array[V] = null
-
-  // Treat the null key differently so we can use nulls in "data" to represent empty items.
-  private var haveNullValue = false
-  private var nullValue: V = null.asInstanceOf[V]
-
-  override def size: Int = if (haveNullValue) _keySet.size + 1 else _keySet.size
-
-  /** Get the value for a given key */
-  def apply(k: K): V = {
-    if (k == null) {
-      nullValue
-    } else {
-      val pos = _keySet.getPos(k)
-      if (pos < 0) {
-        null.asInstanceOf[V]
-      } else {
-        _values(pos)
-      }
-    }
-  }
-
-  /** Set the value for a key */
-  def update(k: K, v: V) {
-    if (k == null) {
-      haveNullValue = true
-      nullValue = v
-    } else {
-      val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
-      _values(pos) = v
-      _keySet.rehashIfNeeded(k, grow, move)
-      _oldValues = null
-    }
-  }
-
-  /**
-   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
-   * set its value to mergeValue(oldValue).
-   *
-   * @return the newly updated value.
-   */
-  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
-    if (k == null) {
-      if (haveNullValue) {
-        nullValue = mergeValue(nullValue)
-      } else {
-        haveNullValue = true
-        nullValue = defaultValue
-      }
-      nullValue
-    } else {
-      val pos = _keySet.fastAdd(k)
-      if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
-        val newValue = defaultValue
-        _values(pos & OpenHashSet.POSITION_MASK) = newValue
-        _keySet.rehashIfNeeded(k, grow, move)
-        newValue
-      } else {
-        _values(pos) = mergeValue(_values(pos))
-        _values(pos)
-      }
-    }
-  }
-
-  override def iterator = new Iterator[(K, V)] {
-    var pos = -1
-    var nextPair: (K, V) = computeNextPair()
-
-    /** Get the next value we should return from next(), or null if we're finished iterating */
-    def computeNextPair(): (K, V) = {
-      if (pos == -1) {    // Treat position -1 as looking at the null value
-        if (haveNullValue) {
-          pos += 1
-          return (null.asInstanceOf[K], nullValue)
-        }
-        pos += 1
-      }
-      pos = _keySet.nextPos(pos)
-      if (pos >= 0) {
-        val ret = (_keySet.getValue(pos), _values(pos))
-        pos += 1
-        ret
-      } else {
-        null
-      }
-    }
-
-    def hasNext = nextPair != null
-
-    def next() = {
-      val pair = nextPair
-      nextPair = computeNextPair()
-      pair
-    }
-  }
-
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the non-specialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
-    _oldValues = _values
-    _values = new Array[V](newCapacity)
-  }
-
-  protected var move = (oldPos: Int, newPos: Int) => {
-    _values(newPos) = _oldValues(oldPos)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
deleted file mode 100644
index 7aa3f62..0000000
--- a/core/src/main/scala/org/apache/spark/util/hash/OpenHashSet.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.hash
-
-
-/**
- * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never
- * removed.
- *
- * The underlying implementation uses Scala compiler's specialization to generate optimized
- * storage for two primitive types (Long and Int). It is much faster than Java's standard HashSet
- * while incurring much less memory overhead. This can serve as building blocks for higher level
- * data structures such as an optimized HashMap.
- *
- * This OpenHashSet is designed to serve as building blocks for higher level data structures
- * such as an optimized hash map. Compared with standard hash set implementations, this class
- * provides its various callbacks interfaces (e.g. allocateFunc, moveFunc) and interfaces to
- * retrieve the position of a key in the underlying array.
- *
- * It uses quadratic probing with a power-of-2 hash table size, which is guaranteed
- * to explore all spaces for each key (see http://en.wikipedia.org/wiki/Quadratic_probing).
- */
-private[spark]
-class OpenHashSet[@specialized(Long, Int) T: ClassManifest](
-    initialCapacity: Int,
-    loadFactor: Double)
-  extends Serializable {
-
-  require(initialCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
-  require(initialCapacity >= 1, "Invalid initial capacity")
-
-  import OpenHashSet._
-
-  def this(initialCapacity: Int) = this(initialCapacity, 0.7)
-
-  def this() = this(64)
-
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the non-specialized one and needs access
-  // to the "private" variables).
-
-  protected val hasher: Hasher[T] = {
-    // It would've been more natural to write the following using pattern matching. But Scala 2.9.x
-    // compiler has a bug when specialization is used together with this pattern matching, and
-    // throws:
-    // scala.tools.nsc.symtab.Types$TypeError: type mismatch;
-    //  found   : scala.reflect.AnyValManifest[Long]
-    //  required: scala.reflect.ClassManifest[Int]
-    //         at scala.tools.nsc.typechecker.Contexts$Context.error(Contexts.scala:298)
-    //         at scala.tools.nsc.typechecker.Infer$Inferencer.error(Infer.scala:207)
-    //         ...
-    val mt = classManifest[T]
-    if (mt == ClassManifest.Long) {
-      (new LongHasher).asInstanceOf[Hasher[T]]
-    } else if (mt == ClassManifest.Int) {
-      (new IntHasher).asInstanceOf[Hasher[T]]
-    } else {
-      new Hasher[T]
-    }
-  }
-
-  protected var _capacity = nextPowerOf2(initialCapacity)
-  protected var _mask = _capacity - 1
-  protected var _size = 0
-
-  protected var _data = classManifest[T].newArray(_capacity)
-  protected var _bitset = new BitSet(_capacity)
-
-  /** Number of elements in the set. */
-  def size: Int = _size
-
-  /** The capacity of the set (i.e. size of the underlying array). */
-  def capacity: Int = _capacity
-
-  /** Return true if this set contains the specified element. */
-  def contains(k: T): Boolean = getPos(k) != INVALID_POS
-
-  /**
-   * Add an element to the set. If the set is over capacity after the insertion, grow the set
-   * and rehash all elements.
-   */
-  def add(k: T) {
-    fastAdd(k)
-    rehashIfNeeded(k, grow, move)
-  }
-
-  /**
-   * Add an element to the set. This one differs from add in that it doesn't trigger rehashing.
-   * The caller is responsible for calling rehashIfNeeded.
-   *
-   * Use (retval & POSITION_MASK) to get the actual position, and
-   * (retval & EXISTENCE_MASK) != 0 for prior existence.
-   *
-   * @return The position where the key is placed, plus the highest order bit is set if the key
-   *         exists previously.
-   */
-  def fastAdd(k: T): Int = putInto(_bitset, _data, k)
-
-  /**
-   * Rehash the set if it is overloaded.
-   * @param k A parameter unused in the function, but to force the Scala compiler to specialize
-   *          this method.
-   * @param allocateFunc Closure invoked when we are allocating a new, larger array.
-   * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
-   *                 to a new position (in the new data array).
-   */
-  def rehashIfNeeded(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
-    if (_size > loadFactor * _capacity) {
-      rehash(k, allocateFunc, moveFunc)
-    }
-  }
-
-  /** Return the position of the element in the underlying array. */
-  def getPos(k: T): Int = {
-    var pos = hashcode(hasher.hash(k)) & _mask
-    var i = 1
-    while (true) {
-      if (!_bitset.get(pos)) {
-        return INVALID_POS
-      } else if (k == _data(pos)) {
-        return pos
-      } else {
-        val delta = i
-        pos = (pos + delta) & _mask
-        i += 1
-      }
-    }
-    // Never reached here
-    INVALID_POS
-  }
-
-  /** Return the value at the specified position. */
-  def getValue(pos: Int): T = _data(pos)
-
-  /**
-   * Return the next position with an element stored, starting from the given position inclusively.
-   */
-  def nextPos(fromPos: Int): Int = _bitset.nextSetBit(fromPos)
-
-  /**
-   * Put an entry into the set. Return the position where the key is placed. In addition, the
-   * highest bid in the returned position is set if the key exists prior to this put.
-   *
-   * This function assumes the data array has at least one empty slot.
-   */
-  private def putInto(bitset: BitSet, data: Array[T], k: T): Int = {
-    val mask = data.length - 1
-    var pos = hashcode(hasher.hash(k)) & mask
-    var i = 1
-    while (true) {
-      if (!bitset.get(pos)) {
-        // This is a new key.
-        data(pos) = k
-        bitset.set(pos)
-        _size += 1
-        return pos | EXISTENCE_MASK
-      } else if (data(pos) == k) {
-        // Found an existing key.
-        return pos
-      } else {
-        val delta = i
-        pos = (pos + delta) & mask
-        i += 1
-      }
-    }
-    // Never reached here
-    assert(INVALID_POS != INVALID_POS)
-    INVALID_POS
-  }
-
-  /**
-   * Double the table's size and re-hash everything. We are not really using k, but it is declared
-   * so Scala compiler can specialize this method (which leads to calling the specialized version
-   * of putInto).
-   *
-   * @param k A parameter unused in the function, but to force the Scala compiler to specialize
-   *          this method.
-   * @param allocateFunc Closure invoked when we are allocating a new, larger array.
-   * @param moveFunc Closure invoked when we move the key from one position (in the old data array)
-   *                 to a new position (in the new data array).
-   */
-  private def rehash(k: T, allocateFunc: (Int) => Unit, moveFunc: (Int, Int) => Unit) {
-    val newCapacity = _capacity * 2
-    require(newCapacity <= (1 << 29), "Can't make capacity bigger than 2^29 elements")
-
-    allocateFunc(newCapacity)
-    val newData = classManifest[T].newArray(newCapacity)
-    val newBitset = new BitSet(newCapacity)
-    var pos = 0
-    _size = 0
-    while (pos < _capacity) {
-      if (_bitset.get(pos)) {
-        val newPos = putInto(newBitset, newData, _data(pos))
-        moveFunc(pos, newPos & POSITION_MASK)
-      }
-      pos += 1
-    }
-    _bitset = newBitset
-    _data = newData
-    _capacity = newCapacity
-    _mask = newCapacity - 1
-  }
-
-  /**
-   * Re-hash a value to deal better with hash functions that don't differ
-   * in the lower bits, similar to java.util.HashMap
-   */
-  private def hashcode(h: Int): Int = {
-    val r = h ^ (h >>> 20) ^ (h >>> 12)
-    r ^ (r >>> 7) ^ (r >>> 4)
-  }
-
-  private def nextPowerOf2(n: Int): Int = {
-    val highBit = Integer.highestOneBit(n)
-    if (highBit == n) n else highBit << 1
-  }
-}
-
-
-private[spark]
-object OpenHashSet {
-
-  val INVALID_POS = -1
-
-  val EXISTENCE_MASK = 0x80000000
-
-  val POSITION_MASK = 0xEFFFFFF
-
-  /**
-   * A set of specialized hash function implementation to avoid boxing hash code computation
-   * in the specialized implementation of OpenHashSet.
-   */
-  sealed class Hasher[@specialized(Long, Int) T] {
-    def hash(o: T): Int = o.hashCode()
-  }
-
-  class LongHasher extends Hasher[Long] {
-    override def hash(o: Long): Int = (o ^ (o >>> 32)).toInt
-  }
-
-  class IntHasher extends Hasher[Int] {
-    override def hash(o: Int): Int = o
-  }
-
-  private def grow1(newSize: Int) {}
-  private def move1(oldPos: Int, newPos: Int) { }
-
-  private val grow = grow1 _
-  private val move = move1 _
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
deleted file mode 100644
index 14c1367..0000000
--- a/core/src/main/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashMap.scala
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.hash
-
-
-/**
- * A fast hash map implementation for primitive, non-null keys. This hash map supports
- * insertions and updates, but not deletions. This map is about an order of magnitude
- * faster than java.util.HashMap, while using much less space overhead.
- *
- * Under the hood, it uses our OpenHashSet implementation.
- */
-private[spark]
-class PrimitiveKeyOpenHashMap[@specialized(Long, Int) K: ClassManifest,
-                              @specialized(Long, Int, Double) V: ClassManifest](
-    initialCapacity: Int)
-  extends Iterable[(K, V)]
-  with Serializable {
-
-  def this() = this(64)
-
-  require(classManifest[K] == classManifest[Long] || classManifest[K] == classManifest[Int])
-
-  protected var _keySet = new OpenHashSet[K](initialCapacity)
-  private var _values = new Array[V](_keySet.capacity)
-
-  private var _oldValues: Array[V] = null
-
-  override def size = _keySet.size
-
-  /** Get the value for a given key */
-  def apply(k: K): V = {
-    val pos = _keySet.getPos(k)
-    _values(pos)
-  }
-
-  /** Set the value for a key */
-  def update(k: K, v: V) {
-    val pos = _keySet.fastAdd(k) & OpenHashSet.POSITION_MASK
-    _values(pos) = v
-    _keySet.rehashIfNeeded(k, grow, move)
-    _oldValues = null
-  }
-
-  /**
-   * If the key doesn't exist yet in the hash map, set its value to defaultValue; otherwise,
-   * set its value to mergeValue(oldValue).
-   *
-   * @return the newly updated value.
-   */
-  def changeValue(k: K, defaultValue: => V, mergeValue: (V) => V): V = {
-    val pos = _keySet.fastAdd(k)
-    if ((pos & OpenHashSet.EXISTENCE_MASK) != 0) {
-      val newValue = defaultValue
-      _values(pos & OpenHashSet.POSITION_MASK) = newValue
-      _keySet.rehashIfNeeded(k, grow, move)
-      newValue
-    } else {
-      _values(pos) = mergeValue(_values(pos))
-      _values(pos)
-    }
-  }
-
-  override def iterator = new Iterator[(K, V)] {
-    var pos = 0
-    var nextPair: (K, V) = computeNextPair()
-
-    /** Get the next value we should return from next(), or null if we're finished iterating */
-    def computeNextPair(): (K, V) = {
-      pos = _keySet.nextPos(pos)
-      if (pos >= 0) {
-        val ret = (_keySet.getValue(pos), _values(pos))
-        pos += 1
-        ret
-      } else {
-        null
-      }
-    }
-
-    def hasNext = nextPair != null
-
-    def next() = {
-      val pair = nextPair
-      nextPair = computeNextPair()
-      pair
-    }
-  }
-
-  // The following member variables are declared as protected instead of private for the
-  // specialization to work (specialized class extends the unspecialized one and needs access
-  // to the "private" variables).
-  // They also should have been val's. We use var's because there is a Scala compiler bug that
-  // would throw illegal access error at runtime if they are declared as val's.
-  protected var grow = (newCapacity: Int) => {
-    _oldValues = _values
-    _values = new Array[V](newCapacity)
-  }
-
-  protected var move = (oldPos: Int, newPos: Int) => {
-    _values(newPos) = _oldValues(oldPos)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
new file mode 100644
index 0000000..0f1ab3d
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+
+class BitSetSuite extends FunSuite {
+
+  test("basic set and get") {
+    val setBits = Seq(0, 9, 1, 10, 90, 96)
+    val bitset = new BitSet(100)
+
+    for (i <- 0 until 100) {
+      assert(!bitset.get(i))
+    }
+
+    setBits.foreach(i => bitset.set(i))
+
+    for (i <- 0 until 100) {
+      if (setBits.contains(i)) {
+        assert(bitset.get(i))
+      } else {
+        assert(!bitset.get(i))
+      }
+    }
+    assert(bitset.cardinality() === setBits.size)
+  }
+
+  test("100% full bit set") {
+    val bitset = new BitSet(10000)
+    for (i <- 0 until 10000) {
+      assert(!bitset.get(i))
+      bitset.set(i)
+    }
+    for (i <- 0 until 10000) {
+      assert(bitset.get(i))
+    }
+    assert(bitset.cardinality() === 10000)
+  }
+
+  test("nextSetBit") {
+    val setBits = Seq(0, 9, 1, 10, 90, 96)
+    val bitset = new BitSet(100)
+    setBits.foreach(i => bitset.set(i))
+
+    assert(bitset.nextSetBit(0) === 0)
+    assert(bitset.nextSetBit(1) === 1)
+    assert(bitset.nextSetBit(2) === 9)
+    assert(bitset.nextSetBit(9) === 9)
+    assert(bitset.nextSetBit(10) === 10)
+    assert(bitset.nextSetBit(11) === 90)
+    assert(bitset.nextSetBit(80) === 90)
+    assert(bitset.nextSetBit(91) === 96)
+    assert(bitset.nextSetBit(96) === 96)
+    assert(bitset.nextSetBit(97) === -1)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
new file mode 100644
index 0000000..5e74ca1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashMapSuite.scala
@@ -0,0 +1,148 @@
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.HashSet
+import org.scalatest.FunSuite
+
+class OpenHashMapSuite extends FunSuite {
+
+  test("initialization") {
+    val goodMap1 = new OpenHashMap[String, Int](1)
+    assert(goodMap1.size === 0)
+    val goodMap2 = new OpenHashMap[String, Int](255)
+    assert(goodMap2.size === 0)
+    val goodMap3 = new OpenHashMap[String, String](256)
+    assert(goodMap3.size === 0)
+    intercept[IllegalArgumentException] {
+      new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29
+    }
+    intercept[IllegalArgumentException] {
+      new OpenHashMap[String, Int](-1)
+    }
+    intercept[IllegalArgumentException] {
+      new OpenHashMap[String, String](0)
+    }
+  }
+
+  test("primitive value") {
+    val map = new OpenHashMap[String, Int]
+
+    for (i <- 1 to 1000) {
+      map(i.toString) = i
+      assert(map(i.toString) === i)
+    }
+
+    assert(map.size === 1000)
+    assert(map(null) === 0)
+
+    map(null) = -1
+    assert(map.size === 1001)
+    assert(map(null) === -1)
+
+    for (i <- 1 to 1000) {
+      assert(map(i.toString) === i)
+    }
+
+    // Test iterator
+    val set = new HashSet[(String, Int)]
+    for ((k, v) <- map) {
+      set.add((k, v))
+    }
+    val expected = (1 to 1000).map(x => (x.toString, x)) :+ (null.asInstanceOf[String], -1)
+    assert(set === expected.toSet)
+  }
+
+  test("non-primitive value") {
+    val map = new OpenHashMap[String, String]
+
+    for (i <- 1 to 1000) {
+      map(i.toString) = i.toString
+      assert(map(i.toString) === i.toString)
+    }
+
+    assert(map.size === 1000)
+    assert(map(null) === null)
+
+    map(null) = "-1"
+    assert(map.size === 1001)
+    assert(map(null) === "-1")
+
+    for (i <- 1 to 1000) {
+      assert(map(i.toString) === i.toString)
+    }
+
+    // Test iterator
+    val set = new HashSet[(String, String)]
+    for ((k, v) <- map) {
+      set.add((k, v))
+    }
+    val expected = (1 to 1000).map(_.toString).map(x => (x, x)) :+ (null.asInstanceOf[String], "-1")
+    assert(set === expected.toSet)
+  }
+
+  test("null keys") {
+    val map = new OpenHashMap[String, String]()
+    for (i <- 1 to 100) {
+      map("" + i) = "" + i
+    }
+    assert(map.size === 100)
+    assert(map(null) === null)
+    map(null) = "hello"
+    assert(map.size === 101)
+    assert(map(null) === "hello")
+  }
+
+  test("null values") {
+    val map = new OpenHashMap[String, String]()
+    for (i <- 1 to 100) {
+      map("" + i) = null
+    }
+    assert(map.size === 100)
+    assert(map("1") === null)
+    assert(map(null) === null)
+    assert(map.size === 100)
+    map(null) = null
+    assert(map.size === 101)
+    assert(map(null) === null)
+  }
+
+  test("changeValue") {
+    val map = new OpenHashMap[String, String]()
+    for (i <- 1 to 100) {
+      map("" + i) = "" + i
+    }
+    assert(map.size === 100)
+    for (i <- 1 to 100) {
+      val res = map.changeValue("" + i, { assert(false); "" }, v => {
+        assert(v === "" + i)
+        v + "!"
+      })
+      assert(res === i + "!")
+    }
+    // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
+    // bug where changeValue would return the wrong result when the map grew on that insert
+    for (i <- 101 to 400) {
+      val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v })
+      assert(res === i + "!")
+    }
+    assert(map.size === 400)
+    assert(map(null) === null)
+    map.changeValue(null, { "null!" }, v => { assert(false); v })
+    assert(map.size === 401)
+    map.changeValue(null, { assert(false); "" }, v => {
+      assert(v === "null!")
+      "null!!"
+    })
+    assert(map.size === 401)
+  }
+
+  test("inserting in capacity-1 map") {
+    val map = new OpenHashMap[String, String](1)
+    for (i <- 1 to 100) {
+      map("" + i) = "" + i
+    }
+    assert(map.size === 100)
+    for (i <- 1 to 100) {
+      assert(map("" + i) === "" + i)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
new file mode 100644
index 0000000..40049e8
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/OpenHashSetSuite.scala
@@ -0,0 +1,74 @@
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+
+class OpenHashSetSuite extends FunSuite {
+
+  test("primitive int") {
+    val set = new OpenHashSet[Int]
+    assert(set.size === 0)
+    set.add(10)
+    assert(set.size === 1)
+    set.add(50)
+    assert(set.size === 2)
+    set.add(999)
+    assert(set.size === 3)
+    set.add(50)
+    assert(set.size === 3)
+  }
+
+  test("primitive long") {
+    val set = new OpenHashSet[Long]
+    assert(set.size === 0)
+    set.add(10L)
+    assert(set.size === 1)
+    set.add(50L)
+    assert(set.size === 2)
+    set.add(999L)
+    assert(set.size === 3)
+    set.add(50L)
+    assert(set.size === 3)
+  }
+
+  test("non-primitive") {
+    val set = new OpenHashSet[String]
+    assert(set.size === 0)
+    set.add(10.toString)
+    assert(set.size === 1)
+    set.add(50.toString)
+    assert(set.size === 2)
+    set.add(999.toString)
+    assert(set.size === 3)
+    set.add(50.toString)
+    assert(set.size === 3)
+  }
+
+  test("non-primitive set growth") {
+    val set = new OpenHashSet[String]
+    for (i <- 1 to 1000) {
+      set.add(i.toString)
+    }
+    assert(set.size === 1000)
+    assert(set.capacity > 1000)
+    for (i <- 1 to 100) {
+      set.add(i.toString)
+    }
+    assert(set.size === 1000)
+    assert(set.capacity > 1000)
+  }
+
+  test("primitive set growth") {
+    val set = new OpenHashSet[Long]
+    for (i <- 1 to 1000) {
+      set.add(i.toLong)
+    }
+    assert(set.size === 1000)
+    assert(set.capacity > 1000)
+    for (i <- 1 to 100) {
+      set.add(i.toLong)
+    }
+    assert(set.size === 1000)
+    assert(set.capacity > 1000)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
new file mode 100644
index 0000000..dc7f6cb
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashSetSuite.scala
@@ -0,0 +1,90 @@
+package org.apache.spark.util.collection
+
+import scala.collection.mutable.HashSet
+import org.scalatest.FunSuite
+
+class PrimitiveKeyOpenHashSetSuite extends FunSuite {
+
+  test("initialization") {
+    val goodMap1 = new PrimitiveKeyOpenHashMap[Int, Int](1)
+    assert(goodMap1.size === 0)
+    val goodMap2 = new PrimitiveKeyOpenHashMap[Int, Int](255)
+    assert(goodMap2.size === 0)
+    val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256)
+    assert(goodMap3.size === 0)
+    intercept[IllegalArgumentException] {
+      new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
+    }
+    intercept[IllegalArgumentException] {
+      new PrimitiveKeyOpenHashMap[Int, Int](-1)
+    }
+    intercept[IllegalArgumentException] {
+      new PrimitiveKeyOpenHashMap[Int, Int](0)
+    }
+  }
+
+  test("basic operations") {
+    val longBase = 1000000L
+    val map = new PrimitiveKeyOpenHashMap[Long, Int]
+
+    for (i <- 1 to 1000) {
+      map(i + longBase) = i
+      assert(map(i + longBase) === i)
+    }
+
+    assert(map.size === 1000)
+
+    for (i <- 1 to 1000) {
+      assert(map(i + longBase) === i)
+    }
+
+    // Test iterator
+    val set = new HashSet[(Long, Int)]
+    for ((k, v) <- map) {
+      set.add((k, v))
+    }
+    assert(set === (1 to 1000).map(x => (x + longBase, x)).toSet)
+  }
+
+  test("null values") {
+    val map = new PrimitiveKeyOpenHashMap[Long, String]()
+    for (i <- 1 to 100) {
+      map(i.toLong) = null
+    }
+    assert(map.size === 100)
+    assert(map(1.toLong) === null)
+  }
+
+  test("changeValue") {
+    val map = new PrimitiveKeyOpenHashMap[Long, String]()
+    for (i <- 1 to 100) {
+      map(i.toLong) = "" + i
+    }
+    assert(map.size === 100)
+    for (i <- 1 to 100) {
+      val res = map.changeValue(i.toLong, { assert(false); "" }, v => {
+        assert(v === "" + i)
+        v + "!"
+      })
+      assert(res === i + "!")
+    }
+    // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
+    // bug where changeValue would return the wrong result when the map grew on that insert
+    for (i <- 101 to 400) {
+      val res = map.changeValue(i.toLong, { i + "!" }, v => { assert(false); v })
+      assert(res === i + "!")
+    }
+    assert(map.size === 400)
+  }
+
+  test("inserting in capacity-1 map") {
+    val map = new PrimitiveKeyOpenHashMap[Long, String](1)
+    for (i <- 1 to 100) {
+      map(i.toLong) = "" + i
+    }
+    assert(map.size === 100)
+    for (i <- 1 to 100) {
+      assert(map(i.toLong) === "" + i)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala
deleted file mode 100644
index 41ede86..0000000
--- a/core/src/test/scala/org/apache/spark/util/hash/BitSetSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.util.hash
-
-import org.scalatest.FunSuite
-
-
-class BitSetSuite extends FunSuite {
-
-  test("basic set and get") {
-    val setBits = Seq(0, 9, 1, 10, 90, 96)
-    val bitset = new BitSet(100)
-
-    for (i <- 0 until 100) {
-      assert(!bitset.get(i))
-    }
-
-    setBits.foreach(i => bitset.set(i))
-
-    for (i <- 0 until 100) {
-      if (setBits.contains(i)) {
-        assert(bitset.get(i))
-      } else {
-        assert(!bitset.get(i))
-      }
-    }
-    assert(bitset.cardinality() === setBits.size)
-  }
-
-  test("100% full bit set") {
-    val bitset = new BitSet(10000)
-    for (i <- 0 until 10000) {
-      assert(!bitset.get(i))
-      bitset.set(i)
-    }
-    for (i <- 0 until 10000) {
-      assert(bitset.get(i))
-    }
-    assert(bitset.cardinality() === 10000)
-  }
-
-  test("nextSetBit") {
-    val setBits = Seq(0, 9, 1, 10, 90, 96)
-    val bitset = new BitSet(100)
-    setBits.foreach(i => bitset.set(i))
-
-    assert(bitset.nextSetBit(0) === 0)
-    assert(bitset.nextSetBit(1) === 1)
-    assert(bitset.nextSetBit(2) === 9)
-    assert(bitset.nextSetBit(9) === 9)
-    assert(bitset.nextSetBit(10) === 10)
-    assert(bitset.nextSetBit(11) === 90)
-    assert(bitset.nextSetBit(80) === 90)
-    assert(bitset.nextSetBit(91) === 96)
-    assert(bitset.nextSetBit(96) === 96)
-    assert(bitset.nextSetBit(97) === -1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala
deleted file mode 100644
index 355784d..0000000
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashMapSuite.scala
+++ /dev/null
@@ -1,148 +0,0 @@
-package org.apache.spark.util.hash
-
-import scala.collection.mutable.HashSet
-import org.scalatest.FunSuite
-
-class OpenHashMapSuite extends FunSuite {
-
-  test("initialization") {
-    val goodMap1 = new OpenHashMap[String, Int](1)
-    assert(goodMap1.size === 0)
-    val goodMap2 = new OpenHashMap[String, Int](255)
-    assert(goodMap2.size === 0)
-    val goodMap3 = new OpenHashMap[String, String](256)
-    assert(goodMap3.size === 0)
-    intercept[IllegalArgumentException] {
-      new OpenHashMap[String, Int](1 << 30) // Invalid map size: bigger than 2^29
-    }
-    intercept[IllegalArgumentException] {
-      new OpenHashMap[String, Int](-1)
-    }
-    intercept[IllegalArgumentException] {
-      new OpenHashMap[String, String](0)
-    }
-  }
-
-  test("primitive value") {
-    val map = new OpenHashMap[String, Int]
-
-    for (i <- 1 to 1000) {
-      map(i.toString) = i
-      assert(map(i.toString) === i)
-    }
-
-    assert(map.size === 1000)
-    assert(map(null) === 0)
-
-    map(null) = -1
-    assert(map.size === 1001)
-    assert(map(null) === -1)
-
-    for (i <- 1 to 1000) {
-      assert(map(i.toString) === i)
-    }
-
-    // Test iterator
-    val set = new HashSet[(String, Int)]
-    for ((k, v) <- map) {
-      set.add((k, v))
-    }
-    val expected = (1 to 1000).map(x => (x.toString, x)) :+ (null.asInstanceOf[String], -1)
-    assert(set === expected.toSet)
-  }
-
-  test("non-primitive value") {
-    val map = new OpenHashMap[String, String]
-
-    for (i <- 1 to 1000) {
-      map(i.toString) = i.toString
-      assert(map(i.toString) === i.toString)
-    }
-
-    assert(map.size === 1000)
-    assert(map(null) === null)
-
-    map(null) = "-1"
-    assert(map.size === 1001)
-    assert(map(null) === "-1")
-
-    for (i <- 1 to 1000) {
-      assert(map(i.toString) === i.toString)
-    }
-
-    // Test iterator
-    val set = new HashSet[(String, String)]
-    for ((k, v) <- map) {
-      set.add((k, v))
-    }
-    val expected = (1 to 1000).map(_.toString).map(x => (x, x)) :+ (null.asInstanceOf[String], "-1")
-    assert(set === expected.toSet)
-  }
-
-  test("null keys") {
-    val map = new OpenHashMap[String, String]()
-    for (i <- 1 to 100) {
-      map("" + i) = "" + i
-    }
-    assert(map.size === 100)
-    assert(map(null) === null)
-    map(null) = "hello"
-    assert(map.size === 101)
-    assert(map(null) === "hello")
-  }
-
-  test("null values") {
-    val map = new OpenHashMap[String, String]()
-    for (i <- 1 to 100) {
-      map("" + i) = null
-    }
-    assert(map.size === 100)
-    assert(map("1") === null)
-    assert(map(null) === null)
-    assert(map.size === 100)
-    map(null) = null
-    assert(map.size === 101)
-    assert(map(null) === null)
-  }
-
-  test("changeValue") {
-    val map = new OpenHashMap[String, String]()
-    for (i <- 1 to 100) {
-      map("" + i) = "" + i
-    }
-    assert(map.size === 100)
-    for (i <- 1 to 100) {
-      val res = map.changeValue("" + i, { assert(false); "" }, v => {
-        assert(v === "" + i)
-        v + "!"
-      })
-      assert(res === i + "!")
-    }
-    // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
-    // bug where changeValue would return the wrong result when the map grew on that insert
-    for (i <- 101 to 400) {
-      val res = map.changeValue("" + i, { i + "!" }, v => { assert(false); v })
-      assert(res === i + "!")
-    }
-    assert(map.size === 400)
-    assert(map(null) === null)
-    map.changeValue(null, { "null!" }, v => { assert(false); v })
-    assert(map.size === 401)
-    map.changeValue(null, { assert(false); "" }, v => {
-      assert(v === "null!")
-      "null!!"
-    })
-    assert(map.size === 401)
-  }
-
-  test("inserting in capacity-1 map") {
-    val map = new OpenHashMap[String, String](1)
-    for (i <- 1 to 100) {
-      map("" + i) = "" + i
-    }
-    assert(map.size === 100)
-    for (i <- 1 to 100) {
-      assert(map("" + i) === "" + i)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
deleted file mode 100644
index b5b3a4a..0000000
--- a/core/src/test/scala/org/apache/spark/util/hash/OpenHashSetSuite.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-package org.apache.spark.util.hash
-
-import org.scalatest.FunSuite
-
-
-class OpenHashSetSuite extends FunSuite {
-
-  test("primitive int") {
-    val set = new OpenHashSet[Int]
-    assert(set.size === 0)
-    set.add(10)
-    assert(set.size === 1)
-    set.add(50)
-    assert(set.size === 2)
-    set.add(999)
-    assert(set.size === 3)
-    set.add(50)
-    assert(set.size === 3)
-  }
-
-  test("primitive long") {
-    val set = new OpenHashSet[Long]
-    assert(set.size === 0)
-    set.add(10L)
-    assert(set.size === 1)
-    set.add(50L)
-    assert(set.size === 2)
-    set.add(999L)
-    assert(set.size === 3)
-    set.add(50L)
-    assert(set.size === 3)
-  }
-
-  test("non-primitive") {
-    val set = new OpenHashSet[String]
-    assert(set.size === 0)
-    set.add(10.toString)
-    assert(set.size === 1)
-    set.add(50.toString)
-    assert(set.size === 2)
-    set.add(999.toString)
-    assert(set.size === 3)
-    set.add(50.toString)
-    assert(set.size === 3)
-  }
-
-  test("non-primitive set growth") {
-    val set = new OpenHashSet[String]
-    for (i <- 1 to 1000) {
-      set.add(i.toString)
-    }
-    assert(set.size === 1000)
-    assert(set.capacity > 1000)
-    for (i <- 1 to 100) {
-      set.add(i.toString)
-    }
-    assert(set.size === 1000)
-    assert(set.capacity > 1000)
-  }
-
-  test("primitive set growth") {
-    val set = new OpenHashSet[Long]
-    for (i <- 1 to 1000) {
-      set.add(i.toLong)
-    }
-    assert(set.size === 1000)
-    assert(set.capacity > 1000)
-    for (i <- 1 to 100) {
-      set.add(i.toLong)
-    }
-    assert(set.size === 1000)
-    assert(set.capacity > 1000)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1e9543b5/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala b/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala
deleted file mode 100644
index b9a4b54..0000000
--- a/core/src/test/scala/org/apache/spark/util/hash/PrimitiveKeyOpenHashSetSuite.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-package org.apache.spark.util.hash
-
-import scala.collection.mutable.HashSet
-import org.scalatest.FunSuite
-
-class PrimitiveKeyOpenHashSetSuite extends FunSuite {
-
-  test("initialization") {
-    val goodMap1 = new PrimitiveKeyOpenHashMap[Int, Int](1)
-    assert(goodMap1.size === 0)
-    val goodMap2 = new PrimitiveKeyOpenHashMap[Int, Int](255)
-    assert(goodMap2.size === 0)
-    val goodMap3 = new PrimitiveKeyOpenHashMap[Int, Int](256)
-    assert(goodMap3.size === 0)
-    intercept[IllegalArgumentException] {
-      new PrimitiveKeyOpenHashMap[Int, Int](1 << 30) // Invalid map size: bigger than 2^29
-    }
-    intercept[IllegalArgumentException] {
-      new PrimitiveKeyOpenHashMap[Int, Int](-1)
-    }
-    intercept[IllegalArgumentException] {
-      new PrimitiveKeyOpenHashMap[Int, Int](0)
-    }
-  }
-
-  test("basic operations") {
-    val longBase = 1000000L
-    val map = new PrimitiveKeyOpenHashMap[Long, Int]
-
-    for (i <- 1 to 1000) {
-      map(i + longBase) = i
-      assert(map(i + longBase) === i)
-    }
-
-    assert(map.size === 1000)
-
-    for (i <- 1 to 1000) {
-      assert(map(i + longBase) === i)
-    }
-
-    // Test iterator
-    val set = new HashSet[(Long, Int)]
-    for ((k, v) <- map) {
-      set.add((k, v))
-    }
-    assert(set === (1 to 1000).map(x => (x + longBase, x)).toSet)
-  }
-
-  test("null values") {
-    val map = new PrimitiveKeyOpenHashMap[Long, String]()
-    for (i <- 1 to 100) {
-      map(i.toLong) = null
-    }
-    assert(map.size === 100)
-    assert(map(1.toLong) === null)
-  }
-
-  test("changeValue") {
-    val map = new PrimitiveKeyOpenHashMap[Long, String]()
-    for (i <- 1 to 100) {
-      map(i.toLong) = "" + i
-    }
-    assert(map.size === 100)
-    for (i <- 1 to 100) {
-      val res = map.changeValue(i.toLong, { assert(false); "" }, v => {
-        assert(v === "" + i)
-        v + "!"
-      })
-      assert(res === i + "!")
-    }
-    // Iterate from 101 to 400 to make sure the map grows a couple of times, because we had a
-    // bug where changeValue would return the wrong result when the map grew on that insert
-    for (i <- 101 to 400) {
-      val res = map.changeValue(i.toLong, { i + "!" }, v => { assert(false); v })
-      assert(res === i + "!")
-    }
-    assert(map.size === 400)
-  }
-
-  test("inserting in capacity-1 map") {
-    val map = new PrimitiveKeyOpenHashMap[Long, String](1)
-    for (i <- 1 to 100) {
-      map(i.toLong) = "" + i
-    }
-    assert(map.size === 100)
-    for (i <- 1 to 100) {
-      assert(map(i.toLong) === "" + i)
-    }
-  }
-}