You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:52 UTC

[08/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
new file mode 100644
index 0000000..88ba10f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import akka.actor._
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.JavaSerializer
+import org.apache.spark.KryoSerializer
+import org.apache.spark.SizeEstimator
+import org.apache.spark.Utils
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.ByteBufferInputStream
+
+
+class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+  var store: BlockManager = null
+  var store2: BlockManager = null
+  var actorSystem: ActorSystem = null
+  var master: BlockManagerMaster = null
+  var oldArch: String = null
+  var oldOops: String = null
+  var oldHeartBeat: String = null
+
+  // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+  System.setProperty("spark.kryoserializer.buffer.mb", "1")
+  val serializer = new KryoSerializer
+
+  before {
+    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+    this.actorSystem = actorSystem
+    System.setProperty("spark.driver.port", boundPort.toString)
+    System.setProperty("spark.hostPort", "localhost:" + boundPort)
+
+    master = new BlockManagerMaster(
+      actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+
+    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+    oldArch = System.setProperty("os.arch", "amd64")
+    oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+    oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
+    val initialize = PrivateMethod[Unit]('initialize)
+    SizeEstimator invokePrivate initialize()
+    // Set some value ...
+    System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
+  }
+
+  after {
+    System.clearProperty("spark.driver.port")
+    System.clearProperty("spark.hostPort")
+
+    if (store != null) {
+      store.stop()
+      store = null
+    }
+    if (store2 != null) {
+      store2.stop()
+      store2 = null
+    }
+    actorSystem.shutdown()
+    actorSystem.awaitTermination()
+    actorSystem = null
+    master = null
+
+    if (oldArch != null) {
+      System.setProperty("os.arch", oldArch)
+    } else {
+      System.clearProperty("os.arch")
+    }
+
+    if (oldOops != null) {
+      System.setProperty("spark.test.useCompressedOops", oldOops)
+    } else {
+      System.clearProperty("spark.test.useCompressedOops")
+    }
+  }
+
+  test("StorageLevel object caching") {
+    val level1 = StorageLevel(false, false, false, 3)
+    val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1
+    val level3 = StorageLevel(false, false, false, 2) // this should return a different object
+    assert(level2 === level1, "level2 is not same as level1")
+    assert(level2.eq(level1), "level2 is not the same object as level1")
+    assert(level3 != level1, "level3 is same as level1")
+    val bytes1 = Utils.serialize(level1)
+    val level1_ = Utils.deserialize[StorageLevel](bytes1)
+    val bytes2 = Utils.serialize(level2)
+    val level2_ = Utils.deserialize[StorageLevel](bytes2)
+    assert(level1_ === level1, "Deserialized level1 not same as original level1")
+    assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2")
+    assert(level2_ === level2, "Deserialized level2 not same as original level2")
+    assert(level2_.eq(level1), "Deserialized level2 not the same object as original level1")
+  }
+
+  test("BlockManagerId object caching") {
+    val id1 = BlockManagerId("e1", "XXX", 1, 0)
+    val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1
+    val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
+    assert(id2 === id1, "id2 is not same as id1")
+    assert(id2.eq(id1), "id2 is not the same object as id1")
+    assert(id3 != id1, "id3 is same as id1")
+    val bytes1 = Utils.serialize(id1)
+    val id1_ = Utils.deserialize[BlockManagerId](bytes1)
+    val bytes2 = Utils.serialize(id2)
+    val id2_ = Utils.deserialize[BlockManagerId](bytes2)
+    assert(id1_ === id1, "Deserialized id1 is not same as original id1")
+    assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1")
+    assert(id2_ === id2, "Deserialized id2 is not same as original id2")
+    assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1")
+  }
+
+  test("master + 1 manager interaction") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+
+    // Putting a1, a2  and a3 in memory and telling master only about a1 and a2
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+
+    // Checking whether blocks are in memory
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+
+    // Checking whether master knows about the blocks or not
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
+    assert(master.getLocations("a2").size > 0, "master was not told about a2")
+    assert(master.getLocations("a3").size === 0, "master was told about a3")
+
+    // Drop a1 and a2 from memory; this should be reported back to the master
+    store.dropFromMemory("a1", null)
+    store.dropFromMemory("a2", null)
+    assert(store.getSingle("a1") === None, "a1 not removed from store")
+    assert(store.getSingle("a2") === None, "a2 not removed from store")
+    assert(master.getLocations("a1").size === 0, "master did not remove a1")
+    assert(master.getLocations("a2").size === 0, "master did not remove a2")
+  }
+
+  test("master + 2 managers interaction") {
+    store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+    store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
+
+    val peers = master.getPeers(store.blockManagerId, 1)
+    assert(peers.size === 1, "master did not return the other manager as a peer")
+    assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
+
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+    store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+    assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+    assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
+  }
+
+  test("removing block") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+
+    // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+    store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+
+    // Checking whether blocks are in memory and memory size
+    val memStatus = master.getMemoryStatus.head._2
+    assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+    assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+    assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+    assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+    assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
+
+    // Checking whether master knows about the blocks or not
+    assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+    assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+    assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+    // Remove a1 and a2 and a3. Should be no-op for a3.
+    master.removeBlock("a1-to-remove")
+    master.removeBlock("a2-to-remove")
+    master.removeBlock("a3-to-remove")
+
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a1-to-remove") should be (None)
+      master.getLocations("a1-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a2-to-remove") should be (None)
+      master.getLocations("a2-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("a3-to-remove") should not be (None)
+      master.getLocations("a3-to-remove") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      val memStatus = master.getMemoryStatus.head._2
+      memStatus._1 should equal (2000L)
+      memStatus._2 should equal (2000L)
+    }
+  }
+
+  test("removing rdd") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    // Putting a1, a2 and a3 in memory.
+    store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+    master.removeRdd(0, blocking = false)
+
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("rdd_0_0") should be (None)
+      master.getLocations("rdd_0_0") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("rdd_0_1") should be (None)
+      master.getLocations("rdd_0_1") should have size 0
+    }
+    eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+      store.getSingle("nonrddblock") should not be (None)
+      master.getLocations("nonrddblock") should have size (1)
+    }
+
+    store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+    master.removeRdd(0, blocking = true)
+    store.getSingle("rdd_0_0") should be (None)
+    master.getLocations("rdd_0_0") should have size 0
+    store.getSingle("rdd_0_1") should be (None)
+    master.getLocations("rdd_0_1") should have size 0
+  }
+
+  test("reregistration on heart beat") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+    master.removeExecutor(store.blockManagerId.executorId)
+    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+    store invokePrivate heartBeat()
+    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+  }
+
+  test("reregistration on block update") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+    master.removeExecutor(store.blockManagerId.executorId)
+    assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.waitForAsyncReregister()
+
+    assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+    assert(master.getLocations("a2").size > 0, "master was not told about a2")
+  }
+
+  test("reregistration doesn't dead lock") {
+    val heartBeat = PrivateMethod[Unit]('heartBeat)
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+    val a1 = new Array[Byte](400)
+    val a2 = List(new Array[Byte](400))
+
+    // try many times to trigger any deadlocks
+    for (i <- 1 to 100) {
+      master.removeExecutor(store.blockManagerId.executorId)
+      val t1 = new Thread {
+        override def run() {
+          store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+        }
+      }
+      val t2 = new Thread {
+        override def run() {
+          store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+        }
+      }
+      val t3 = new Thread {
+        override def run() {
+          store invokePrivate heartBeat()
+        }
+      }
+
+      t1.start()
+      t2.start()
+      t3.start()
+      t1.join()
+      t2.join()
+      t3.join()
+
+      store.dropFromMemory("a1", null)
+      store.dropFromMemory("a2", null)
+      store.waitForAsyncReregister()
+    }
+  }
+
+  test("in-memory LRU storage") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(store.getSingle("a1") === None, "a1 was in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    // At this point a2 was gotten last, so LRU will getSingle rid of a3
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") === None, "a3 was in store")
+  }
+
+  test("in-memory LRU storage with serialization") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(store.getSingle("a1") === None, "a1 was in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    // At this point a2 was gotten last, so LRU will getSingle rid of a3
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") === None, "a3 was in store")
+  }
+
+  test("in-memory LRU for partitions of same RDD") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+    // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
+    // from the same RDD
+    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+    assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+    assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+    // Check that rdd_0_3 doesn't replace them even after further accesses
+    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+    assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+  }
+
+  test("in-memory LRU for partitions of multiple RDDs") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    // At this point rdd_1_1 should've replaced rdd_0_1
+    assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
+    assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+    assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+    // Do a get() on rdd_0_2 so that it is the most recently used item
+    assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+    // Put in more partitions from RDD 0; they should replace rdd_1_1
+    store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+    // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
+    // when we try to add rdd_0_4.
+    assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
+    assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+    assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
+    assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+    assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+  }
+
+  test("on-disk storage") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+    store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+    assert(store.getSingle("a2") != None, "a2 was in store")
+    assert(store.getSingle("a3") != None, "a3 was in store")
+    assert(store.getSingle("a1") != None, "a1 was in store")
+  }
+
+  test("disk and memory storage") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+  }
+
+  test("disk and memory storage with getLocalBytes") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+    assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+    assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+    assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+    assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+    assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+  }
+
+  test("disk and memory storage with serialization") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+    assert(store.getSingle("a1") != None, "a1 was not in store")
+    assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+  }
+
+  test("disk and memory storage with serialization and getLocalBytes") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+    store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+    assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+    assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+    assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+    assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+  }
+
+  test("LRU with mixed storage levels") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val a1 = new Array[Byte](400)
+    val a2 = new Array[Byte](400)
+    val a3 = new Array[Byte](400)
+    val a4 = new Array[Byte](400)
+    // First store a1 and a2, both in memory, and a3, on disk only
+    store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+    store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+    // At this point LRU should not kick in because a3 is only on disk
+    assert(store.getSingle("a1") != None, "a2 was not in store")
+    assert(store.getSingle("a2") != None, "a3 was not in store")
+    assert(store.getSingle("a3") != None, "a1 was not in store")
+    assert(store.getSingle("a1") != None, "a2 was not in store")
+    assert(store.getSingle("a2") != None, "a3 was not in store")
+    assert(store.getSingle("a3") != None, "a1 was not in store")
+    // Now let's add in a4, which uses both disk and memory; a1 should drop out
+    store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+    assert(store.getSingle("a1") == None, "a1 was in store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+    assert(store.getSingle("a3") != None, "a3 was not in store")
+    assert(store.getSingle("a4") != None, "a4 was not in store")
+  }
+
+  test("in-memory LRU with streams") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val list1 = List(new Array[Byte](200), new Array[Byte](200))
+    val list2 = List(new Array[Byte](200), new Array[Byte](200))
+    val list3 = List(new Array[Byte](200), new Array[Byte](200))
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.get("list2") != None, "list2 was not in store")
+    assert(store.get("list2").get.size == 2)
+    assert(store.get("list3") != None, "list3 was not in store")
+    assert(store.get("list3").get.size == 2)
+    assert(store.get("list1") === None, "list1 was in store")
+    assert(store.get("list2") != None, "list2 was not in store")
+    assert(store.get("list2").get.size == 2)
+    // At this point list2 was gotten last, so LRU will getSingle rid of list3
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+    assert(store.get("list1") != None, "list1 was not in store")
+    assert(store.get("list1").get.size == 2)
+    assert(store.get("list2") != None, "list2 was not in store")
+    assert(store.get("list2").get.size == 2)
+    assert(store.get("list3") === None, "list1 was in store")
+  }
+
+  test("LRU with mixed storage levels and streams") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+    val list1 = List(new Array[Byte](200), new Array[Byte](200))
+    val list2 = List(new Array[Byte](200), new Array[Byte](200))
+    val list3 = List(new Array[Byte](200), new Array[Byte](200))
+    val list4 = List(new Array[Byte](200), new Array[Byte](200))
+    // First store list1 and list2, both in memory, and list3, on disk only
+    store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+    store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+    // At this point LRU should not kick in because list3 is only on disk
+    assert(store.get("list1") != None, "list2 was not in store")
+    assert(store.get("list1").get.size === 2)
+    assert(store.get("list2") != None, "list3 was not in store")
+    assert(store.get("list2").get.size === 2)
+    assert(store.get("list3") != None, "list1 was not in store")
+    assert(store.get("list3").get.size === 2)
+    assert(store.get("list1") != None, "list2 was not in store")
+    assert(store.get("list1").get.size === 2)
+    assert(store.get("list2") != None, "list3 was not in store")
+    assert(store.get("list2").get.size === 2)
+    assert(store.get("list3") != None, "list1 was not in store")
+    assert(store.get("list3").get.size === 2)
+    // Now let's add in list4, which uses both disk and memory; list1 should drop out
+    store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+    assert(store.get("list1") === None, "list1 was in store")
+    assert(store.get("list2") != None, "list3 was not in store")
+    assert(store.get("list2").get.size === 2)
+    assert(store.get("list3") != None, "list1 was not in store")
+    assert(store.get("list3").get.size === 2)
+    assert(store.get("list4") != None, "list4 was not in store")
+    assert(store.get("list4").get.size === 2)
+  }
+
+  test("negative byte values in ByteBufferInputStream") {
+    val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray)
+    val stream = new ByteBufferInputStream(buffer)
+    val temp = new Array[Byte](10)
+    assert(stream.read() === 254, "unexpected byte read")
+    assert(stream.read() === 255, "unexpected byte read")
+    assert(stream.read() === 0, "unexpected byte read")
+    assert(stream.read(temp, 0, temp.length) === 2, "unexpected number of bytes read")
+    assert(stream.read() === -1, "end of stream not signalled")
+    assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled")
+  }
+
+  test("overly large block") {
+    store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
+    store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+    assert(store.getSingle("a1") === None, "a1 was in store")
+    store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+    assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+    assert(store.getSingle("a2") != None, "a2 was not in store")
+  }
+
+  test("block compression") {
+    try {
+      System.setProperty("spark.shuffle.compress", "true")
+      store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+      store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
+      store.stop()
+      store = null
+
+      System.setProperty("spark.shuffle.compress", "false")
+      store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
+      store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
+      store.stop()
+      store = null
+
+      System.setProperty("spark.broadcast.compress", "true")
+      store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
+      store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
+      store.stop()
+      store = null
+
+      System.setProperty("spark.broadcast.compress", "false")
+      store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
+      store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
+      store.stop()
+      store = null
+
+      System.setProperty("spark.rdd.compress", "true")
+      store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
+      store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
+      store.stop()
+      store = null
+
+      System.setProperty("spark.rdd.compress", "false")
+      store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
+      store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+      assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
+      store.stop()
+      store = null
+
+      // Check that any other block types are also kept uncompressed
+      store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
+      store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+      assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+      store.stop()
+      store = null
+    } finally {
+      System.clearProperty("spark.shuffle.compress")
+      System.clearProperty("spark.broadcast.compress")
+      System.clearProperty("spark.rdd.compress")
+    }
+  }
+
+  test("block store put failure") {
+    // Use Java serializer so we can create an unserializable error.
+    store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+
+    // The put should fail since a1 is not serializable.
+    class UnserializableClass
+    val a1 = new UnserializableClass
+    intercept[java.io.NotSerializableException] {
+      store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+    }
+
+    // Make sure get a1 doesn't hang and returns None.
+    failAfter(1 second) {
+      assert(store.getSingle("a1") == None, "a1 should not be in store")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
new file mode 100644
index 0000000..3321fb5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.util.{Failure, Success, Try}
+import java.net.ServerSocket
+import org.scalatest.FunSuite
+import org.eclipse.jetty.server.Server
+
+class UISuite extends FunSuite {
+  test("jetty port increases under contention") {
+    val startPort = 3030
+    val server = new Server(startPort)
+    server.start()
+    val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+    val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+
+    // Allow some wiggle room in case ports on the machine are under contention
+    assert(boundPort1 > startPort && boundPort1 < startPort + 10)
+    assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
+  }
+
+  test("jetty binds to port 0 correctly") {
+    val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", 0, Seq())
+    assert(jettyServer.getState === "STARTED")
+    assert(boundPort != 0)
+    Try {new ServerSocket(boundPort)} match {
+      case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
+      case Failure  (e) =>
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
new file mode 100644
index 0000000..6364246
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+/**
+ *
+ */
+
+class DistributionSuite extends FunSuite with ShouldMatchers {
+  test("summary") {
+    val d = new Distribution((1 to 100).toArray.map{_.toDouble})
+    val stats = d.statCounter
+    stats.count should be (100)
+    stats.mean should be (50.5)
+    stats.sum should be (50 * 101)
+
+    val quantiles = d.getQuantiles()
+    quantiles(0) should be (1)
+    quantiles(1) should be (26)
+    quantiles(2) should be (51)
+    quantiles(3) should be (76)
+    quantiles(4) should be (100)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/FakeClock.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
new file mode 100644
index 0000000..0a45917
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+class FakeClock extends Clock {
+  private var time = 0L
+
+  def advance(millis: Long): Unit = time += millis
+
+  def getTime(): Long = time
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
new file mode 100644
index 0000000..4586746
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable.Buffer
+import java.util.NoSuchElementException
+
+class NextIteratorSuite extends FunSuite with ShouldMatchers {
+  test("one iteration") {
+    val i = new StubIterator(Buffer(1))
+    i.hasNext should be === true
+    i.next should be === 1
+    i.hasNext should be === false
+    intercept[NoSuchElementException] { i.next() }
+  }
+  
+  test("two iterations") {
+    val i = new StubIterator(Buffer(1, 2))
+    i.hasNext should be === true
+    i.next should be === 1
+    i.hasNext should be === true
+    i.next should be === 2
+    i.hasNext should be === false
+    intercept[NoSuchElementException] { i.next() }
+  }
+
+  test("empty iteration") {
+    val i = new StubIterator(Buffer())
+    i.hasNext should be === false
+    intercept[NoSuchElementException] { i.next() }
+  }
+
+  test("close is called once for empty iterations") {
+    val i = new StubIterator(Buffer())
+    i.hasNext should be === false
+    i.hasNext should be === false
+    i.closeCalled should be === 1
+  }
+
+  test("close is called once for non-empty iterations") {
+    val i = new StubIterator(Buffer(1, 2))
+    i.next should be === 1
+    i.next should be === 2
+    // close isn't called until we check for the next element
+    i.closeCalled should be === 0
+    i.hasNext should be === false
+    i.closeCalled should be === 1
+    i.hasNext should be === false
+    i.closeCalled should be === 1
+  }
+
+  class StubIterator(ints: Buffer[Int])  extends NextIterator[Int] {
+    var closeCalled = 0
+    
+    override def getNext() = {
+      if (ints.size == 0) {
+        finished = true
+        0
+      } else {
+        ints.remove(0)
+      }
+    }
+
+    override def close() {
+      closeCalled += 1
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
new file mode 100644
index 0000000..a9dd0b1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import java.io.ByteArrayOutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStreamSuite extends FunSuite {
+
+  private def benchmark[U](f: => U): Long = {
+    val start = System.nanoTime
+    f
+    System.nanoTime - start
+  }
+
+  test("write") {
+    val underlying = new ByteArrayOutputStream
+    val data = "X" * 41000
+    val stream = new RateLimitedOutputStream(underlying, 10000)
+    val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
+    assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
+    assert(underlying.toString("UTF-8") == data)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
deleted file mode 100644
index 0af175f..0000000
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-import collection.mutable
-import java.util.Random
-import scala.math.exp
-import scala.math.signum
-import spark.SparkContext._
-
-class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
-
-  test ("basic accumulation"){
-    sc = new SparkContext("local", "test")
-    val acc : Accumulator[Int] = sc.accumulator(0)
-
-    val d = sc.parallelize(1 to 20)
-    d.foreach{x => acc += x}
-    acc.value should be (210)
-
-
-    val longAcc = sc.accumulator(0l)
-    val maxInt = Integer.MAX_VALUE.toLong
-    d.foreach{x => longAcc += maxInt + x}
-    longAcc.value should be (210l + maxInt * 20)
-  }
-
-  test ("value not assignable from tasks") {
-    sc = new SparkContext("local", "test")
-    val acc : Accumulator[Int] = sc.accumulator(0)
-
-    val d = sc.parallelize(1 to 20)
-    evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
-  }
-
-  test ("add value to collection accumulators") {
-    import SetAccum._
-    val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
-      sc = new SparkContext("local[" + nThreads + "]", "test")
-      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
-      val d = sc.parallelize(1 to maxI)
-      d.foreach {
-        x => acc += x
-      }
-      val v = acc.value.asInstanceOf[mutable.Set[Int]]
-      for (i <- 1 to maxI) {
-        v should contain(i)
-      }
-      resetSparkContext()
-    }
-  }
-
-  implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
-    def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
-      t1 ++= t2
-      t1
-    }
-    def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
-      t1 += t2
-      t1
-    }
-    def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
-      new mutable.HashSet[Any]()
-    }
-  }
-
-  test ("value not readable in tasks") {
-    import SetAccum._
-    val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
-      sc = new SparkContext("local[" + nThreads + "]", "test")
-      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
-      val d = sc.parallelize(1 to maxI)
-      evaluating {
-        d.foreach {
-          x => acc.value += x
-        }
-      } should produce [SparkException]
-      resetSparkContext()
-    }
-  }
-
-  test ("collection accumulators") {
-    val maxI = 1000
-    for (nThreads <- List(1, 10)) {
-      // test single & multi-threaded
-      sc = new SparkContext("local[" + nThreads + "]", "test")
-      val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
-      val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
-      val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
-      val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
-      d.foreach {
-        x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
-      }
-
-      // Note that this is typed correctly -- no casts necessary
-      setAcc.value.size should be (maxI)
-      bufferAcc.value.size should be (2 * maxI)
-      mapAcc.value.size should be (maxI)
-      for (i <- 1 to maxI) {
-        setAcc.value should contain(i)
-        bufferAcc.value should contain(i)
-        mapAcc.value should contain (i -> i.toString)
-      }
-      resetSparkContext()
-    }
-  }
-
-  test ("localValue readable in tasks") {
-    import SetAccum._
-    val maxI = 1000
-    for (nThreads <- List(1, 10)) { //test single & multi-threaded
-      sc = new SparkContext("local[" + nThreads + "]", "test")
-      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
-      val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
-      val d = sc.parallelize(groupedInts)
-      d.foreach {
-        x => acc.localValue ++= x
-      }
-      acc.value should be ( (0 to maxI).toSet)
-      resetSparkContext()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
deleted file mode 100644
index 785721e..0000000
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-
-class BroadcastSuite extends FunSuite with LocalSparkContext {
-  
-  test("basic broadcast") {
-    sc = new SparkContext("local", "test")
-    val list = List(1, 2, 3, 4)
-    val listBroadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
-    assert(results.collect.toSet === Set((1, 10), (2, 10)))
-  }
-
-  test("broadcast variables accessed in multiple threads") {
-    sc = new SparkContext("local[10]", "test")
-    val list = List(1, 2, 3, 4)
-    val listBroadcast = sc.broadcast(list)
-    val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
-    assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
deleted file mode 100644
index 966dede..0000000
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import java.io.File
-import spark.rdd._
-import spark.SparkContext._
-import storage.StorageLevel
-
-class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
-  initLogging()
-
-  var checkpointDir: File = _
-  val partitioner = new HashPartitioner(2)
-
-  override def beforeEach() {
-    super.beforeEach()
-    checkpointDir = File.createTempFile("temp", "")
-    checkpointDir.delete()
-    sc = new SparkContext("local", "test")
-    sc.setCheckpointDir(checkpointDir.toString)
-  }
-
-  override def afterEach() {
-    super.afterEach()
-    if (checkpointDir != null) {
-      checkpointDir.delete()
-    }
-  }
-
-  test("basic checkpointing") {
-    val parCollection = sc.makeRDD(1 to 4)
-    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
-    flatMappedRDD.checkpoint()
-    assert(flatMappedRDD.dependencies.head.rdd == parCollection)
-    val result = flatMappedRDD.collect()
-    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
-    assert(flatMappedRDD.collect() === result)
-  }
-
-  test("RDDs with one-to-one dependencies") {
-    testCheckpointing(_.map(x => x.toString))
-    testCheckpointing(_.flatMap(x => 1 to x))
-    testCheckpointing(_.filter(_ % 2 == 0))
-    testCheckpointing(_.sample(false, 0.5, 0))
-    testCheckpointing(_.glom())
-    testCheckpointing(_.mapPartitions(_.map(_.toString)))
-    testCheckpointing(r => new MapPartitionsWithIndexRDD(r,
-      (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false ))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
-    testCheckpointing(_.pipe(Seq("cat")))
-  }
-
-  test("ParallelCollection") {
-    val parCollection = sc.makeRDD(1 to 4, 2)
-    val numPartitions = parCollection.partitions.size
-    parCollection.checkpoint()
-    assert(parCollection.dependencies === Nil)
-    val result = parCollection.collect()
-    assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
-    assert(parCollection.dependencies != Nil)
-    assert(parCollection.partitions.length === numPartitions)
-    assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
-    assert(parCollection.collect() === result)
-  }
-
-  test("BlockRDD") {
-    val blockId = "id"
-    val blockManager = SparkEnv.get.blockManager
-    blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
-    val blockRDD = new BlockRDD[String](sc, Array(blockId))
-    val numPartitions = blockRDD.partitions.size
-    blockRDD.checkpoint()
-    val result = blockRDD.collect()
-    assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result)
-    assert(blockRDD.dependencies != Nil)
-    assert(blockRDD.partitions.length === numPartitions)
-    assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList)
-    assert(blockRDD.collect() === result)
-  }
-
-  test("ShuffledRDD") {
-    testCheckpointing(rdd => {
-      // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
-      new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
-    })
-  }
-
-  test("UnionRDD") {
-    def otherRDD = sc.makeRDD(1 to 10, 1)
-
-    // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
-    // Current implementation of UnionRDD has transient reference to parent RDDs,
-    // so only the partitions will reduce in serialized size, not the RDD.
-    testCheckpointing(_.union(otherRDD), false, true)
-    testParentCheckpointing(_.union(otherRDD), false, true)
-  }
-
-  test("CartesianRDD") {
-    def otherRDD = sc.makeRDD(1 to 10, 1)
-    testCheckpointing(new CartesianRDD(sc, _, otherRDD))
-
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
-
-    // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
-    // Note that this test is very specific to the current implementation of CartesianRDD.
-    val ones = sc.makeRDD(1 to 100, 10).map(x => x)
-    ones.checkpoint() // checkpoint that MappedRDD
-    val cartesian = new CartesianRDD(sc, ones, ones)
-    val splitBeforeCheckpoint =
-      serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
-    cartesian.count() // do the checkpointing
-    val splitAfterCheckpoint =
-      serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
-    assert(
-      (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
-        (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
-      "CartesianRDD.parents not updated after parent RDD checkpointed"
-    )
-  }
-
-  test("CoalescedRDD") {
-    testCheckpointing(_.coalesce(2))
-
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(_.coalesce(2), true, false)
-
-    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
-    // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
-    val ones = sc.makeRDD(1 to 100, 10).map(x => x)
-    ones.checkpoint() // checkpoint that MappedRDD
-    val coalesced = new CoalescedRDD(ones, 2)
-    val splitBeforeCheckpoint =
-      serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
-    coalesced.count() // do the checkpointing
-    val splitAfterCheckpoint =
-      serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
-    assert(
-      splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
-      "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
-    )
-  }
-
-  test("CoGroupedRDD") {
-    val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
-    testCheckpointing(rdd => {
-      CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
-    }, false, true)
-
-    val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
-    testParentCheckpointing(rdd => {
-      CheckpointSuite.cogroup(
-        longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
-    }, false, true)
-  }
-
-  test("ZippedRDD") {
-    testCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
-    // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-  }
-
-  test("CheckpointRDD with zero partitions") {
-    val rdd = new BlockRDD[Int](sc, Array[String]())
-    assert(rdd.partitions.size === 0)
-    assert(rdd.isCheckpointed === false)
-    rdd.checkpoint()
-    assert(rdd.count() === 0)
-    assert(rdd.isCheckpointed === true)
-    assert(rdd.partitions.size === 0)
-  }
-
-  /**
-   * Test checkpointing of the final RDD generated by the given operation. By default,
-   * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
-   * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
-   * not, but this is not done by default as usually the partitions do not refer to any RDD and
-   * therefore never store the lineage.
-   */
-  def testCheckpointing[U: ClassManifest](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean = true,
-      testRDDPartitionSize: Boolean = false
-    ) {
-    // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
-    val operatedRDD = op(baseRDD)
-    val parentRDD = operatedRDD.dependencies.headOption.orNull
-    val rddType = operatedRDD.getClass.getSimpleName
-    val numPartitions = operatedRDD.partitions.length
-
-    // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
-    operatedRDD.checkpoint()
-    val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
-
-    // Test whether the checkpoint file has been created
-    assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
-
-    // Test whether dependencies have been changed from its earlier parent RDD
-    assert(operatedRDD.dependencies.head.rdd != parentRDD)
-
-    // Test whether the partitions have been changed to the new Hadoop partitions
-    assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
-
-    // Test whether the number of partitions is same as before
-    assert(operatedRDD.partitions.length === numPartitions)
-
-    // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
-
-    // Test whether serialized size of the RDD has reduced. If the RDD
-    // does not have any dependency to another RDD (e.g., ParallelCollection,
-    // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
-    if (testRDDSize) {
-      logInfo("Size of " + rddType +
-        "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing " +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
-
-    // Test whether serialized size of the partitions has reduced. If the partitions
-    // do not have any non-transient reference to another RDD or another RDD's partitions, it
-    // does not refer to a lineage and therefore may not reduce in size after checkpointing.
-    // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
-    // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
-    // replaced with the HadooPartitions of the checkpointed RDD.
-    if (testRDDPartitionSize) {
-      logInfo("Size of " + rddType + " partitions "
-        + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing " +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
-  }
-
-  /**
-   * Test whether checkpointing of the parent of the generated RDD also
-   * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
-   * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
-   * this RDD will remember the partitions and therefore potentially the whole lineage.
-   */
-  def testParentCheckpointing[U: ClassManifest](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean,
-      testRDDPartitionSize: Boolean
-    ) {
-    // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
-    val operatedRDD = op(baseRDD)
-    val parentRDD = operatedRDD.dependencies.head.rdd
-    val rddType = operatedRDD.getClass.getSimpleName
-    val parentRDDType = parentRDD.getClass.getSimpleName
-
-    // Get the partitions and dependencies of the parent in case they're lazily computed
-    parentRDD.dependencies
-    parentRDD.partitions
-
-    // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
-    parentRDD.checkpoint()  // checkpoint the parent RDD, not the generated one
-    val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
-
-    // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
-
-    // Test whether serialized size of the RDD has reduced because of its parent being
-    // checkpointed. If this RDD or its parent RDD do not have any dependency
-    // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
-    // not reduce in size after checkpointing.
-    if (testRDDSize) {
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
-
-    // Test whether serialized size of the partitions has reduced because of its parent being
-    // checkpointed. If the partitions do not have any non-transient reference to another RDD
-    // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
-    // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
-    // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
-    // partitions must have changed after checkpointing.
-    if (testRDDPartitionSize) {
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
-
-  }
-
-  /**
-   * Generate an RDD with a long lineage of one-to-one dependencies.
-   */
-  def generateLongLineageRDD(): RDD[Int] = {
-    var rdd = sc.makeRDD(1 to 100, 4)
-    for (i <- 1 to 50) {
-      rdd = rdd.map(x => x + 1)
-    }
-    rdd
-  }
-
-  /**
-   * Generate an RDD with a long lineage specifically for CoGroupedRDD.
-   * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
-   * and narrow dependency with this RDD. This method generate such an RDD by a sequence
-   * of cogroups and mapValues which creates a long lineage of narrow dependencies.
-   */
-  def generateLongLineageRDDForCoGroupedRDD() = {
-    val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
-
-    def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
-
-    var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
-    for(i <- 1 to 10) {
-      cogrouped = cogrouped.mapValues(add).cogroup(ones)
-    }
-    cogrouped.mapValues(add)
-  }
-
-  /**
-   * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks
-   * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
-   */
-  def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
-    (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
-     Utils.serialize(rdd.partitions).length)
-  }
-
-  /**
-   * Serialize and deserialize an object. This is useful to verify the objects
-   * contents after deserialization (e.g., the contents of an RDD split after
-   * it is sent to a slave along with a task)
-   */
-  def serializeDeserialize[T](obj: T): T = {
-    val bytes = Utils.serialize(obj)
-    Utils.deserialize[T](bytes)
-  }
-}
-
-
-object CheckpointSuite {
-  // This is a custom cogroup function that does not use mapValues like
-  // the PairRDDFunctions.cogroup()
-  def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
-    //println("First = " + first + ", second = " + second)
-    new CoGroupedRDD[K](
-      Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
-      part
-    ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala
deleted file mode 100644
index 7d2831e..0000000
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.NotSerializableException
-
-import org.scalatest.FunSuite
-import spark.LocalSparkContext._
-import SparkContext._
-
-class ClosureCleanerSuite extends FunSuite {
-  test("closures inside an object") {
-    assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures inside a class") {
-    val obj = new TestClass
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures inside a class with no default constructor") {
-    val obj = new TestClassWithoutDefaultConstructor(5)
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("closures that don't use fields of the outer class") {
-    val obj = new TestClassWithoutFieldAccess
-    assert(obj.run() === 30) // 6 + 7 + 8 + 9
-  }
-
-  test("nested closures inside an object") {
-    assert(TestObjectWithNesting.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
-  }
-
-  test("nested closures inside a class") {
-    val obj = new TestClassWithNesting(1)
-    assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
-  }
-}
-
-// A non-serializable class we create in closures to make sure that we aren't
-// keeping references to unneeded variables from our outer closures.
-class NonSerializable {}
-
-object TestObject {
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + x).reduce(_ + _)
-    }
-  }
-}
-
-class TestClass extends Serializable {
-  var x = 5
-  
-  def getX = x
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + getX).reduce(_ + _)
-    }
-  }
-}
-
-class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
-  def getX = x
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + getX).reduce(_ + _)
-    }
-  }
-}
-
-// This class is not serializable, but we aren't using any of its fields in our
-// closures, so they won't have a $outer pointing to it and should still work.
-class TestClassWithoutFieldAccess {
-  var nonSer = new NonSerializable
-
-  def run(): Int = {
-    var nonSer2 = new NonSerializable
-    var x = 5
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      nums.map(_ + x).reduce(_ + _)
-    }
-  }
-}
-
-
-object TestObjectWithNesting {
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      var y = 1
-      for (i <- 1 to 4) {
-        var nonSer2 = new NonSerializable
-        var x = i
-        answer += nums.map(_ + x + y).reduce(_ + _)
-      }
-      answer
-    }
-  }
-}
-
-class TestClassWithNesting(val y: Int) extends Serializable {
-  def getY = y
-
-  def run(): Int = {
-    var nonSer = new NonSerializable
-    var answer = 0
-    return withSpark(new SparkContext("local", "test")) { sc =>
-      val nums = sc.parallelize(Array(1, 2, 3, 4))
-      for (i <- 1 to 4) {
-        var nonSer2 = new NonSerializable
-        var x = i
-        answer += nums.map(_ + x + getY).reduce(_ + _)
-      }
-      answer
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
deleted file mode 100644
index e11efe4..0000000
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import network.ConnectionManagerId
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalatest.time.{Span, Millis}
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-import org.eclipse.jetty.server.{Server, Request, Handler}
-
-import com.google.common.io.Files
-
-import scala.collection.mutable.ArrayBuffer
-
-import SparkContext._
-import storage.{GetBlock, BlockManagerWorker, StorageLevel}
-import ui.JettyUtils
-
-
-class NotSerializableClass
-class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
-
-
-class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
-  with LocalSparkContext {
-
-  val clusterUrl = "local-cluster[2,1,512]"
-
-  after {
-    System.clearProperty("spark.reducer.maxMbInFlight")
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
-  test("task throws not serializable exception") {
-    // Ensures that executors do not crash when an exn is not serializable. If executors crash,
-    // this test will hang. Correct behavior is that executors don't crash but fail tasks
-    // and the scheduler throws a SparkException.
-
-    // numSlaves must be less than numPartitions
-    val numSlaves = 3
-    val numPartitions = 10
-
-    sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
-    val data = sc.parallelize(1 to 100, numPartitions).
-      map(x => throw new NotSerializableExn(new NotSerializableClass))
-    intercept[SparkException] {
-      data.count()
-    }
-    resetSparkContext()
-  }
-
-  test("local-cluster format") {
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    assert(sc.parallelize(1 to 2, 2).count() == 2)
-    resetSparkContext()
-    sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
-    assert(sc.parallelize(1 to 2, 2).count() == 2)
-    resetSparkContext()
-    sc = new SparkContext("local-cluster[2, 1, 512]", "test")
-    assert(sc.parallelize(1 to 2, 2).count() == 2)
-    resetSparkContext()
-    sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
-    assert(sc.parallelize(1 to 2, 2).count() == 2)
-    resetSparkContext()
-  }
-
-  test("simple groupByKey") {
-    sc = new SparkContext(clusterUrl, "test")
-    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
-    val groups = pairs.groupByKey(5).collect()
-    assert(groups.size === 2)
-    val valuesFor1 = groups.find(_._1 == 1).get._2
-    assert(valuesFor1.toList.sorted === List(1, 2, 3))
-    val valuesFor2 = groups.find(_._1 == 2).get._2
-    assert(valuesFor2.toList.sorted === List(1))
-  }
-
-  test("groupByKey where map output sizes exceed maxMbInFlight") {
-    System.setProperty("spark.reducer.maxMbInFlight", "1")
-    sc = new SparkContext(clusterUrl, "test")
-    // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
-    // file should be about 2.5 MB
-    val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
-    val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
-    assert(groups.length === 16)
-    assert(groups.map(_._2).sum === 2000)
-    // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
-  }
-
-  test("accumulators") {
-    sc = new SparkContext(clusterUrl, "test")
-    val accum = sc.accumulator(0)
-    sc.parallelize(1 to 10, 10).foreach(x => accum += x)
-    assert(accum.value === 55)
-  }
-
-  test("broadcast variables") {
-    sc = new SparkContext(clusterUrl, "test")
-    val array = new Array[Int](100)
-    val bv = sc.broadcast(array)
-    array(2) = 3     // Change the array -- this should not be seen on workers
-    val rdd = sc.parallelize(1 to 10, 10)
-    val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
-    assert(sum === 0)
-  }
-
-  test("repeatedly failing task") {
-    sc = new SparkContext(clusterUrl, "test")
-    val accum = sc.accumulator(0)
-    val thrown = intercept[SparkException] {
-      sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
-    }
-    assert(thrown.getClass === classOf[SparkException])
-    assert(thrown.getMessage.contains("more than 4 times"))
-  }
-
-  test("caching") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).cache()
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching on disk") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching in memory, replicated") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching in memory, serialized, replicated") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching on disk, replicated") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching in memory and disk, replicated") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-  }
-
-  test("caching in memory and disk, serialized, replicated") {
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
-
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-    assert(data.count() === 1000)
-
-    // Get all the locations of the first partition and try to fetch the partitions
-    // from those locations.
-    val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
-    val blockId = blockIds(0)
-    val blockManager = SparkEnv.get.blockManager
-    blockManager.master.getLocations(blockId).foreach(id => {
-      val bytes = BlockManagerWorker.syncGetBlock(
-        GetBlock(blockId), ConnectionManagerId(id.host, id.port))
-      val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
-      assert(deserialized === (1 to 100).toList)
-    })
-  }
-
-  test("compute without caching when no partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.0001")
-    sc = new SparkContext(clusterUrl, "test")
-    // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
-    // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
-    val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
-    assert(data.count() === 4000000)
-    assert(data.count() === 4000000)
-    assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
-  test("compute when only some partitions fit in memory") {
-    System.setProperty("spark.storage.memoryFraction", "0.01")
-    sc = new SparkContext(clusterUrl, "test")
-    // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
-    // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
-    // to make sure that *some* of them do fit though
-    val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
-    assert(data.count() === 4000000)
-    assert(data.count() === 4000000)
-    assert(data.count() === 4000000)
-    System.clearProperty("spark.storage.memoryFraction")
-  }
-
-  test("passing environment variables to cluster") {
-    sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
-    val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
-    assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
-  }
-
-  test("recover from node failures") {
-    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
-    DistributedSuite.amMaster = true
-    sc = new SparkContext(clusterUrl, "test")
-    val data = sc.parallelize(Seq(true, true), 2)
-    assert(data.count === 2) // force executors to start
-    assert(data.map(markNodeIfIdentity).collect.size === 2)
-    assert(data.map(failOnMarkedIdentity).collect.size === 2)
-  }
-
-  test("recover from repeated node failures during shuffle-map") {
-    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
-    DistributedSuite.amMaster = true
-    sc = new SparkContext(clusterUrl, "test")
-    for (i <- 1 to 3) {
-      val data = sc.parallelize(Seq(true, false), 2)
-      assert(data.count === 2)
-      assert(data.map(markNodeIfIdentity).collect.size === 2)
-      assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
-    }
-  }
-
-  test("recover from repeated node failures during shuffle-reduce") {
-    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
-    DistributedSuite.amMaster = true
-    sc = new SparkContext(clusterUrl, "test")
-    for (i <- 1 to 3) {
-      val data = sc.parallelize(Seq(true, true), 2)
-      assert(data.count === 2)
-      assert(data.map(markNodeIfIdentity).collect.size === 2)
-      // This relies on mergeCombiners being used to perform the actual reduce for this
-      // test to actually be testing what it claims.
-      val grouped = data.map(x => x -> x).combineByKey(
-                      x => x,
-                      (x: Boolean, y: Boolean) => x,
-                      (x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
-                    )
-      assert(grouped.collect.size === 1)
-    }
-  }
-
-  test("recover from node failures with replication") {
-    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
-    DistributedSuite.amMaster = true
-    // Using more than two nodes so we don't have a symmetric communication pattern and might
-    // cache a partially correct list of peers.
-    sc = new SparkContext("local-cluster[3,1,512]", "test")
-    for (i <- 1 to 3) {
-      val data = sc.parallelize(Seq(true, false, false, false), 4)
-      data.persist(StorageLevel.MEMORY_ONLY_2)
-
-      assert(data.count === 4)
-      assert(data.map(markNodeIfIdentity).collect.size === 4)
-      assert(data.map(failOnMarkedIdentity).collect.size === 4)
-
-      // Create a new replicated RDD to make sure that cached peer information doesn't cause
-      // problems.
-      val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
-      assert(data2.count === 2)
-    }
-  }
-
-  test("unpersist RDDs") {
-    DistributedSuite.amMaster = true
-    sc = new SparkContext("local-cluster[3,1,512]", "test")
-    val data = sc.parallelize(Seq(true, false, false, false), 4)
-    data.persist(StorageLevel.MEMORY_ONLY_2)
-    data.count
-    assert(sc.persistentRdds.isEmpty === false)
-    data.unpersist()
-    assert(sc.persistentRdds.isEmpty === true)
-
-    failAfter(Span(3000, Millis)) {
-      try {
-        while (! sc.getRDDStorageInfo.isEmpty) {
-          Thread.sleep(200)
-        }
-      } catch {
-        case _ => { Thread.sleep(10) }
-          // Do nothing. We might see exceptions because block manager
-          // is racing this thread to remove entries from the driver.
-      }
-    }
-  }
-
-  test("job should fail if TaskResult exceeds Akka frame size") {
-    // We must use local-cluster mode since results are returned differently
-    // when running under LocalScheduler:
-    sc = new SparkContext("local-cluster[1,1,512]", "test")
-    val akkaFrameSize =
-      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
-    val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
-    val exception = intercept[SparkException] {
-      rdd.reduce((x, y) => x)
-    }
-    exception.getMessage should endWith("result exceeded Akka frame size")
-  }
-}
-
-object DistributedSuite {
-  // Indicates whether this JVM is marked for failure.
-  var mark = false
-
-  // Set by test to remember if we are in the driver program so we can assert
-  // that we are not.
-  var amMaster = false
-
-  // Act like an identity function, but if the argument is true, set mark to true.
-  def markNodeIfIdentity(item: Boolean): Boolean = {
-    if (item) {
-      assert(!amMaster)
-      mark = true
-    }
-    item
-  }
-
-  // Act like an identity function, but if mark was set to true previously, fail,
-  // crashing the entire JVM.
-  def failOnMarkedIdentity(item: Boolean): Boolean = {
-    if (mark) {
-      System.exit(42)
-    }
-    item
-  }
-}