You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:52 UTC
[08/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
new file mode 100644
index 0000000..88ba10f
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -0,0 +1,666 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.storage
+
+import java.nio.ByteBuffer
+
+import akka.actor._
+
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import org.scalatest.PrivateMethodTester
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers._
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark.JavaSerializer
+import org.apache.spark.KryoSerializer
+import org.apache.spark.SizeEstimator
+import org.apache.spark.Utils
+import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.ByteBufferInputStream
+
+
+class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodTester {
+ var store: BlockManager = null
+ var store2: BlockManager = null
+ var actorSystem: ActorSystem = null
+ var master: BlockManagerMaster = null
+ var oldArch: String = null
+ var oldOops: String = null
+ var oldHeartBeat: String = null
+
+ // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
+ System.setProperty("spark.kryoserializer.buffer.mb", "1")
+ val serializer = new KryoSerializer
+
+ before {
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("test", "localhost", 0)
+ this.actorSystem = actorSystem
+ System.setProperty("spark.driver.port", boundPort.toString)
+ System.setProperty("spark.hostPort", "localhost:" + boundPort)
+
+ master = new BlockManagerMaster(
+ actorSystem.actorOf(Props(new BlockManagerMasterActor(true))))
+
+ // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
+ oldArch = System.setProperty("os.arch", "amd64")
+ oldOops = System.setProperty("spark.test.useCompressedOops", "true")
+ oldHeartBeat = System.setProperty("spark.storage.disableBlockManagerHeartBeat", "true")
+ val initialize = PrivateMethod[Unit]('initialize)
+ SizeEstimator invokePrivate initialize()
+ // Set some value ...
+ System.setProperty("spark.hostPort", Utils.localHostName() + ":" + 1111)
+ }
+
+ after {
+ System.clearProperty("spark.driver.port")
+ System.clearProperty("spark.hostPort")
+
+ if (store != null) {
+ store.stop()
+ store = null
+ }
+ if (store2 != null) {
+ store2.stop()
+ store2 = null
+ }
+ actorSystem.shutdown()
+ actorSystem.awaitTermination()
+ actorSystem = null
+ master = null
+
+ if (oldArch != null) {
+ System.setProperty("os.arch", oldArch)
+ } else {
+ System.clearProperty("os.arch")
+ }
+
+ if (oldOops != null) {
+ System.setProperty("spark.test.useCompressedOops", oldOops)
+ } else {
+ System.clearProperty("spark.test.useCompressedOops")
+ }
+ }
+
+ test("StorageLevel object caching") {
+ val level1 = StorageLevel(false, false, false, 3)
+ val level2 = StorageLevel(false, false, false, 3) // this should return the same object as level1
+ val level3 = StorageLevel(false, false, false, 2) // this should return a different object
+ assert(level2 === level1, "level2 is not same as level1")
+ assert(level2.eq(level1), "level2 is not the same object as level1")
+ assert(level3 != level1, "level3 is same as level1")
+ val bytes1 = Utils.serialize(level1)
+ val level1_ = Utils.deserialize[StorageLevel](bytes1)
+ val bytes2 = Utils.serialize(level2)
+ val level2_ = Utils.deserialize[StorageLevel](bytes2)
+ assert(level1_ === level1, "Deserialized level1 not same as original level1")
+ assert(level1_.eq(level1), "Deserialized level1 not the same object as original level2")
+ assert(level2_ === level2, "Deserialized level2 not same as original level2")
+ assert(level2_.eq(level1), "Deserialized level2 not the same object as original level1")
+ }
+
+ test("BlockManagerId object caching") {
+ val id1 = BlockManagerId("e1", "XXX", 1, 0)
+ val id2 = BlockManagerId("e1", "XXX", 1, 0) // this should return the same object as id1
+ val id3 = BlockManagerId("e1", "XXX", 2, 0) // this should return a different object
+ assert(id2 === id1, "id2 is not same as id1")
+ assert(id2.eq(id1), "id2 is not the same object as id1")
+ assert(id3 != id1, "id3 is same as id1")
+ val bytes1 = Utils.serialize(id1)
+ val id1_ = Utils.deserialize[BlockManagerId](bytes1)
+ val bytes2 = Utils.serialize(id2)
+ val id2_ = Utils.deserialize[BlockManagerId](bytes2)
+ assert(id1_ === id1, "Deserialized id1 is not same as original id1")
+ assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1")
+ assert(id2_ === id2, "Deserialized id2 is not same as original id2")
+ assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1")
+ }
+
+ test("master + 1 manager interaction") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+
+ // Checking whether blocks are in memory
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+
+ // Checking whether master knows about the blocks or not
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3").size === 0, "master was told about a3")
+
+ // Drop a1 and a2 from memory; this should be reported back to the master
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
+ assert(store.getSingle("a1") === None, "a1 not removed from store")
+ assert(store.getSingle("a2") === None, "a2 not removed from store")
+ assert(master.getLocations("a1").size === 0, "master did not remove a1")
+ assert(master.getLocations("a2").size === 0, "master did not remove a2")
+ }
+
+ test("master + 2 managers interaction") {
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ store2 = new BlockManager("exec2", actorSystem, master, new KryoSerializer, 2000)
+
+ val peers = master.getPeers(store.blockManagerId, 1)
+ assert(peers.size === 1, "master did not return the other manager as a peer")
+ assert(peers.head === store2.blockManagerId, "peer returned by master is not the other manager")
+
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
+ store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
+ assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
+ assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
+ }
+
+ test("removing block") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+
+ // Putting a1, a2 and a3 in memory and telling master only about a1 and a2
+ store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
+
+ // Checking whether blocks are in memory and memory size
+ val memStatus = master.getMemoryStatus.head._2
+ assert(memStatus._1 == 2000L, "total memory " + memStatus._1 + " should equal 2000")
+ assert(memStatus._2 <= 1200L, "remaining memory " + memStatus._2 + " should <= 1200")
+ assert(store.getSingle("a1-to-remove") != None, "a1 was not in store")
+ assert(store.getSingle("a2-to-remove") != None, "a2 was not in store")
+ assert(store.getSingle("a3-to-remove") != None, "a3 was not in store")
+
+ // Checking whether master knows about the blocks or not
+ assert(master.getLocations("a1-to-remove").size > 0, "master was not told about a1")
+ assert(master.getLocations("a2-to-remove").size > 0, "master was not told about a2")
+ assert(master.getLocations("a3-to-remove").size === 0, "master was told about a3")
+
+ // Remove a1 and a2 and a3. Should be no-op for a3.
+ master.removeBlock("a1-to-remove")
+ master.removeBlock("a2-to-remove")
+ master.removeBlock("a3-to-remove")
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a1-to-remove") should be (None)
+ master.getLocations("a1-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a2-to-remove") should be (None)
+ master.getLocations("a2-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("a3-to-remove") should not be (None)
+ master.getLocations("a3-to-remove") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ val memStatus = master.getMemoryStatus.head._2
+ memStatus._1 should equal (2000L)
+ memStatus._2 should equal (2000L)
+ }
+ }
+
+ test("removing rdd") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ // Putting a1, a2 and a3 in memory.
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0, blocking = false)
+
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
+ }
+ eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
+ store.getSingle("nonrddblock") should not be (None)
+ master.getLocations("nonrddblock") should have size (1)
+ }
+
+ store.putSingle("rdd_0_0", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_1", a2, StorageLevel.MEMORY_ONLY)
+ master.removeRdd(0, blocking = true)
+ store.getSingle("rdd_0_0") should be (None)
+ master.getLocations("rdd_0_0") should have size 0
+ store.getSingle("rdd_0_1") should be (None)
+ master.getLocations("rdd_0_1") should have size 0
+ }
+
+ test("reregistration on heart beat") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.removeExecutor(store.blockManagerId.executorId)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store invokePrivate heartBeat()
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ }
+
+ test("reregistration on block update") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ assert(master.getLocations("a1").size > 0, "master was not told about a1")
+
+ master.removeExecutor(store.blockManagerId.executorId)
+ assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
+
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.waitForAsyncReregister()
+
+ assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
+ assert(master.getLocations("a2").size > 0, "master was not told about a2")
+ }
+
+ test("reregistration doesn't dead lock") {
+ val heartBeat = PrivateMethod[Unit]('heartBeat)
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 2000)
+ val a1 = new Array[Byte](400)
+ val a2 = List(new Array[Byte](400))
+
+ // try many times to trigger any deadlocks
+ for (i <- 1 to 100) {
+ master.removeExecutor(store.blockManagerId.executorId)
+ val t1 = new Thread {
+ override def run() {
+ store.put("a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ }
+ }
+ val t2 = new Thread {
+ override def run() {
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ }
+ }
+ val t3 = new Thread {
+ override def run() {
+ store invokePrivate heartBeat()
+ }
+ }
+
+ t1.start()
+ t2.start()
+ t3.start()
+ t1.join()
+ t2.join()
+ t3.join()
+
+ store.dropFromMemory("a1", null)
+ store.dropFromMemory("a2", null)
+ store.waitForAsyncReregister()
+ }
+ }
+
+ test("in-memory LRU storage") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY)
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a1") === None, "a1 was in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ // At this point a2 was gotten last, so LRU will getSingle rid of a3
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") === None, "a3 was in store")
+ }
+
+ test("in-memory LRU storage with serialization") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY_SER)
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a1") === None, "a1 was in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ // At this point a2 was gotten last, so LRU will getSingle rid of a3
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") === None, "a3 was in store")
+ }
+
+ test("in-memory LRU for partitions of same RDD") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
+ // Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
+ // from the same RDD
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
+ // Check that rdd_0_3 doesn't replace them even after further accesses
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
+ }
+
+ test("in-memory LRU for partitions of multiple RDDs") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // At this point rdd_1_1 should've replaced rdd_0_1
+ assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ // Do a get() on rdd_0_2 so that it is the most recently used item
+ assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
+ // Put in more partitions from RDD 0; they should replace rdd_1_1
+ store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
+ // Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
+ // when we try to add rdd_0_4.
+ assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
+ assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
+ assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
+ assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
+ }
+
+ test("on-disk storage") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+ assert(store.getSingle("a2") != None, "a2 was in store")
+ assert(store.getSingle("a3") != None, "a3 was in store")
+ assert(store.getSingle("a1") != None, "a1 was in store")
+ }
+
+ test("disk and memory storage") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("disk and memory storage with getLocalBytes") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("disk and memory storage with serialization") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getSingle("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("disk and memory storage with serialization and getLocalBytes") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
+ store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
+ assert(store.getLocalBytes("a2") != None, "a2 was not in store")
+ assert(store.getLocalBytes("a3") != None, "a3 was not in store")
+ assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
+ assert(store.getLocalBytes("a1") != None, "a1 was not in store")
+ assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
+ }
+
+ test("LRU with mixed storage levels") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val a1 = new Array[Byte](400)
+ val a2 = new Array[Byte](400)
+ val a3 = new Array[Byte](400)
+ val a4 = new Array[Byte](400)
+ // First store a1 and a2, both in memory, and a3, on disk only
+ store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
+ store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
+ // At this point LRU should not kick in because a3 is only on disk
+ assert(store.getSingle("a1") != None, "a2 was not in store")
+ assert(store.getSingle("a2") != None, "a3 was not in store")
+ assert(store.getSingle("a3") != None, "a1 was not in store")
+ assert(store.getSingle("a1") != None, "a2 was not in store")
+ assert(store.getSingle("a2") != None, "a3 was not in store")
+ assert(store.getSingle("a3") != None, "a1 was not in store")
+ // Now let's add in a4, which uses both disk and memory; a1 should drop out
+ store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
+ assert(store.getSingle("a1") == None, "a1 was in store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ assert(store.getSingle("a3") != None, "a3 was not in store")
+ assert(store.getSingle("a4") != None, "a4 was not in store")
+ }
+
+ test("in-memory LRU with streams") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list2 = List(new Array[Byte](200), new Array[Byte](200))
+ val list3 = List(new Array[Byte](200), new Array[Byte](200))
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").get.size == 2)
+ assert(store.get("list3") != None, "list3 was not in store")
+ assert(store.get("list3").get.size == 2)
+ assert(store.get("list1") === None, "list1 was in store")
+ assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").get.size == 2)
+ // At this point list2 was gotten last, so LRU will getSingle rid of list3
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
+ assert(store.get("list1") != None, "list1 was not in store")
+ assert(store.get("list1").get.size == 2)
+ assert(store.get("list2") != None, "list2 was not in store")
+ assert(store.get("list2").get.size == 2)
+ assert(store.get("list3") === None, "list1 was in store")
+ }
+
+ test("LRU with mixed storage levels and streams") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 1200)
+ val list1 = List(new Array[Byte](200), new Array[Byte](200))
+ val list2 = List(new Array[Byte](200), new Array[Byte](200))
+ val list3 = List(new Array[Byte](200), new Array[Byte](200))
+ val list4 = List(new Array[Byte](200), new Array[Byte](200))
+ // First store list1 and list2, both in memory, and list3, on disk only
+ store.put("list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
+ store.put("list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
+ // At this point LRU should not kick in because list3 is only on disk
+ assert(store.get("list1") != None, "list2 was not in store")
+ assert(store.get("list1").get.size === 2)
+ assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").get.size === 2)
+ assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").get.size === 2)
+ assert(store.get("list1") != None, "list2 was not in store")
+ assert(store.get("list1").get.size === 2)
+ assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").get.size === 2)
+ assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").get.size === 2)
+ // Now let's add in list4, which uses both disk and memory; list1 should drop out
+ store.put("list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
+ assert(store.get("list1") === None, "list1 was in store")
+ assert(store.get("list2") != None, "list3 was not in store")
+ assert(store.get("list2").get.size === 2)
+ assert(store.get("list3") != None, "list1 was not in store")
+ assert(store.get("list3").get.size === 2)
+ assert(store.get("list4") != None, "list4 was not in store")
+ assert(store.get("list4").get.size === 2)
+ }
+
+ test("negative byte values in ByteBufferInputStream") {
+ val buffer = ByteBuffer.wrap(Array[Int](254, 255, 0, 1, 2).map(_.toByte).toArray)
+ val stream = new ByteBufferInputStream(buffer)
+ val temp = new Array[Byte](10)
+ assert(stream.read() === 254, "unexpected byte read")
+ assert(stream.read() === 255, "unexpected byte read")
+ assert(stream.read() === 0, "unexpected byte read")
+ assert(stream.read(temp, 0, temp.length) === 2, "unexpected number of bytes read")
+ assert(stream.read() === -1, "end of stream not signalled")
+ assert(stream.read(temp, 0, temp.length) === -1, "end of stream not signalled")
+ }
+
+ test("overly large block") {
+ store = new BlockManager("<driver>", actorSystem, master, serializer, 500)
+ store.putSingle("a1", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.getSingle("a1") === None, "a1 was in store")
+ store.putSingle("a2", new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK)
+ assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
+ assert(store.getSingle("a2") != None, "a2 was not in store")
+ }
+
+ test("block compression") {
+ try {
+ System.setProperty("spark.shuffle.compress", "true")
+ store = new BlockManager("exec1", actorSystem, master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") <= 100, "shuffle_0_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.shuffle.compress", "false")
+ store = new BlockManager("exec2", actorSystem, master, serializer, 2000)
+ store.putSingle("shuffle_0_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("shuffle_0_0_0") >= 1000, "shuffle_0_0_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "true")
+ store = new BlockManager("exec3", actorSystem, master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") <= 100, "broadcast_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.broadcast.compress", "false")
+ store = new BlockManager("exec4", actorSystem, master, serializer, 2000)
+ store.putSingle("broadcast_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("broadcast_0") >= 1000, "broadcast_0 was compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "true")
+ store = new BlockManager("exec5", actorSystem, master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") <= 100, "rdd_0_0 was not compressed")
+ store.stop()
+ store = null
+
+ System.setProperty("spark.rdd.compress", "false")
+ store = new BlockManager("exec6", actorSystem, master, serializer, 2000)
+ store.putSingle("rdd_0_0", new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
+ assert(store.memoryStore.getSize("rdd_0_0") >= 1000, "rdd_0_0 was compressed")
+ store.stop()
+ store = null
+
+ // Check that any other block types are also kept uncompressed
+ store = new BlockManager("exec7", actorSystem, master, serializer, 2000)
+ store.putSingle("other_block", new Array[Byte](1000), StorageLevel.MEMORY_ONLY)
+ assert(store.memoryStore.getSize("other_block") >= 1000, "other_block was compressed")
+ store.stop()
+ store = null
+ } finally {
+ System.clearProperty("spark.shuffle.compress")
+ System.clearProperty("spark.broadcast.compress")
+ System.clearProperty("spark.rdd.compress")
+ }
+ }
+
+ test("block store put failure") {
+ // Use Java serializer so we can create an unserializable error.
+ store = new BlockManager("<driver>", actorSystem, master, new JavaSerializer, 1200)
+
+ // The put should fail since a1 is not serializable.
+ class UnserializableClass
+ val a1 = new UnserializableClass
+ intercept[java.io.NotSerializableException] {
+ store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
+ }
+
+ // Make sure get a1 doesn't hang and returns None.
+ failAfter(1 second) {
+ assert(store.getSingle("a1") == None, "a1 should not be in store")
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/ui/UISuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
new file mode 100644
index 0000000..3321fb5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+import scala.util.{Failure, Success, Try}
+import java.net.ServerSocket
+import org.scalatest.FunSuite
+import org.eclipse.jetty.server.Server
+
+class UISuite extends FunSuite {
+ test("jetty port increases under contention") {
+ val startPort = 3030
+ val server = new Server(startPort)
+ server.start()
+ val (jettyServer1, boundPort1) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+ val (jettyServer2, boundPort2) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+
+ // Allow some wiggle room in case ports on the machine are under contention
+ assert(boundPort1 > startPort && boundPort1 < startPort + 10)
+ assert(boundPort2 > boundPort1 && boundPort2 < boundPort1 + 10)
+ }
+
+ test("jetty binds to port 0 correctly") {
+ val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", 0, Seq())
+ assert(jettyServer.getState === "STARTED")
+ assert(boundPort != 0)
+ Try {new ServerSocket(boundPort)} match {
+ case Success(s) => fail("Port %s doesn't seem used by jetty server".format(boundPort))
+ case Failure (e) =>
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
new file mode 100644
index 0000000..6364246
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/DistributionSuite.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+
+/**
+ *
+ */
+
+class DistributionSuite extends FunSuite with ShouldMatchers {
+ test("summary") {
+ val d = new Distribution((1 to 100).toArray.map{_.toDouble})
+ val stats = d.statCounter
+ stats.count should be (100)
+ stats.mean should be (50.5)
+ stats.sum should be (50 * 101)
+
+ val quantiles = d.getQuantiles()
+ quantiles(0) should be (1)
+ quantiles(1) should be (26)
+ quantiles(2) should be (51)
+ quantiles(3) should be (76)
+ quantiles(4) should be (100)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/FakeClock.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/FakeClock.scala b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
new file mode 100644
index 0000000..0a45917
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/FakeClock.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+class FakeClock extends Clock {
+ private var time = 0L
+
+ def advance(millis: Long): Unit = time += millis
+
+ def getTime(): Long = time
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
new file mode 100644
index 0000000..4586746
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/NextIteratorSuite.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import scala.collection.mutable.Buffer
+import java.util.NoSuchElementException
+
+class NextIteratorSuite extends FunSuite with ShouldMatchers {
+ test("one iteration") {
+ val i = new StubIterator(Buffer(1))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("two iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.hasNext should be === true
+ i.next should be === 1
+ i.hasNext should be === true
+ i.next should be === 2
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("empty iteration") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ intercept[NoSuchElementException] { i.next() }
+ }
+
+ test("close is called once for empty iterations") {
+ val i = new StubIterator(Buffer())
+ i.hasNext should be === false
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ test("close is called once for non-empty iterations") {
+ val i = new StubIterator(Buffer(1, 2))
+ i.next should be === 1
+ i.next should be === 2
+ // close isn't called until we check for the next element
+ i.closeCalled should be === 0
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ i.hasNext should be === false
+ i.closeCalled should be === 1
+ }
+
+ class StubIterator(ints: Buffer[Int]) extends NextIterator[Int] {
+ var closeCalled = 0
+
+ override def getNext() = {
+ if (ints.size == 0) {
+ finished = true
+ 0
+ } else {
+ ints.remove(0)
+ }
+ }
+
+ override def close() {
+ closeCalled += 1
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
new file mode 100644
index 0000000..a9dd0b1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/RateLimitedOutputStreamSuite.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util
+
+import org.scalatest.FunSuite
+import java.io.ByteArrayOutputStream
+import java.util.concurrent.TimeUnit._
+
+class RateLimitedOutputStreamSuite extends FunSuite {
+
+ private def benchmark[U](f: => U): Long = {
+ val start = System.nanoTime
+ f
+ System.nanoTime - start
+ }
+
+ test("write") {
+ val underlying = new ByteArrayOutputStream
+ val data = "X" * 41000
+ val stream = new RateLimitedOutputStream(underlying, 10000)
+ val elapsedNs = benchmark { stream.write(data.getBytes("UTF-8")) }
+ assert(SECONDS.convert(elapsedNs, NANOSECONDS) == 4)
+ assert(underlying.toString("UTF-8") == data)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/AccumulatorSuite.scala b/core/src/test/scala/spark/AccumulatorSuite.scala
deleted file mode 100644
index 0af175f..0000000
--- a/core/src/test/scala/spark/AccumulatorSuite.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-import collection.mutable
-import java.util.Random
-import scala.math.exp
-import scala.math.signum
-import spark.SparkContext._
-
-class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
-
- test ("basic accumulation"){
- sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
-
- val d = sc.parallelize(1 to 20)
- d.foreach{x => acc += x}
- acc.value should be (210)
-
-
- val longAcc = sc.accumulator(0l)
- val maxInt = Integer.MAX_VALUE.toLong
- d.foreach{x => longAcc += maxInt + x}
- longAcc.value should be (210l + maxInt * 20)
- }
-
- test ("value not assignable from tasks") {
- sc = new SparkContext("local", "test")
- val acc : Accumulator[Int] = sc.accumulator(0)
-
- val d = sc.parallelize(1 to 20)
- evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
- }
-
- test ("add value to collection accumulators") {
- import SetAccum._
- val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
- sc = new SparkContext("local[" + nThreads + "]", "test")
- val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
- val d = sc.parallelize(1 to maxI)
- d.foreach {
- x => acc += x
- }
- val v = acc.value.asInstanceOf[mutable.Set[Int]]
- for (i <- 1 to maxI) {
- v should contain(i)
- }
- resetSparkContext()
- }
- }
-
- implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
- def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
- t1 ++= t2
- t1
- }
- def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
- t1 += t2
- t1
- }
- def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
- new mutable.HashSet[Any]()
- }
- }
-
- test ("value not readable in tasks") {
- import SetAccum._
- val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
- sc = new SparkContext("local[" + nThreads + "]", "test")
- val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
- val d = sc.parallelize(1 to maxI)
- evaluating {
- d.foreach {
- x => acc.value += x
- }
- } should produce [SparkException]
- resetSparkContext()
- }
- }
-
- test ("collection accumulators") {
- val maxI = 1000
- for (nThreads <- List(1, 10)) {
- // test single & multi-threaded
- sc = new SparkContext("local[" + nThreads + "]", "test")
- val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
- val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
- val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
- val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
- d.foreach {
- x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
- }
-
- // Note that this is typed correctly -- no casts necessary
- setAcc.value.size should be (maxI)
- bufferAcc.value.size should be (2 * maxI)
- mapAcc.value.size should be (maxI)
- for (i <- 1 to maxI) {
- setAcc.value should contain(i)
- bufferAcc.value should contain(i)
- mapAcc.value should contain (i -> i.toString)
- }
- resetSparkContext()
- }
- }
-
- test ("localValue readable in tasks") {
- import SetAccum._
- val maxI = 1000
- for (nThreads <- List(1, 10)) { //test single & multi-threaded
- sc = new SparkContext("local[" + nThreads + "]", "test")
- val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
- val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
- val d = sc.parallelize(groupedInts)
- d.foreach {
- x => acc.localValue ++= x
- }
- acc.value should be ( (0 to maxI).toSet)
- resetSparkContext()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/BroadcastSuite.scala b/core/src/test/scala/spark/BroadcastSuite.scala
deleted file mode 100644
index 785721e..0000000
--- a/core/src/test/scala/spark/BroadcastSuite.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-
-class BroadcastSuite extends FunSuite with LocalSparkContext {
-
- test("basic broadcast") {
- sc = new SparkContext("local", "test")
- val list = List(1, 2, 3, 4)
- val listBroadcast = sc.broadcast(list)
- val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
- assert(results.collect.toSet === Set((1, 10), (2, 10)))
- }
-
- test("broadcast variables accessed in multiple threads") {
- sc = new SparkContext("local[10]", "test")
- val list = List(1, 2, 3, 4)
- val listBroadcast = sc.broadcast(list)
- val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
- assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
deleted file mode 100644
index 966dede..0000000
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ /dev/null
@@ -1,392 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import java.io.File
-import spark.rdd._
-import spark.SparkContext._
-import storage.StorageLevel
-
-class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
- initLogging()
-
- var checkpointDir: File = _
- val partitioner = new HashPartitioner(2)
-
- override def beforeEach() {
- super.beforeEach()
- checkpointDir = File.createTempFile("temp", "")
- checkpointDir.delete()
- sc = new SparkContext("local", "test")
- sc.setCheckpointDir(checkpointDir.toString)
- }
-
- override def afterEach() {
- super.afterEach()
- if (checkpointDir != null) {
- checkpointDir.delete()
- }
- }
-
- test("basic checkpointing") {
- val parCollection = sc.makeRDD(1 to 4)
- val flatMappedRDD = parCollection.flatMap(x => 1 to x)
- flatMappedRDD.checkpoint()
- assert(flatMappedRDD.dependencies.head.rdd == parCollection)
- val result = flatMappedRDD.collect()
- assert(flatMappedRDD.dependencies.head.rdd != parCollection)
- assert(flatMappedRDD.collect() === result)
- }
-
- test("RDDs with one-to-one dependencies") {
- testCheckpointing(_.map(x => x.toString))
- testCheckpointing(_.flatMap(x => 1 to x))
- testCheckpointing(_.filter(_ % 2 == 0))
- testCheckpointing(_.sample(false, 0.5, 0))
- testCheckpointing(_.glom())
- testCheckpointing(_.mapPartitions(_.map(_.toString)))
- testCheckpointing(r => new MapPartitionsWithIndexRDD(r,
- (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false ))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
- testCheckpointing(_.pipe(Seq("cat")))
- }
-
- test("ParallelCollection") {
- val parCollection = sc.makeRDD(1 to 4, 2)
- val numPartitions = parCollection.partitions.size
- parCollection.checkpoint()
- assert(parCollection.dependencies === Nil)
- val result = parCollection.collect()
- assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
- assert(parCollection.dependencies != Nil)
- assert(parCollection.partitions.length === numPartitions)
- assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
- assert(parCollection.collect() === result)
- }
-
- test("BlockRDD") {
- val blockId = "id"
- val blockManager = SparkEnv.get.blockManager
- blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
- val blockRDD = new BlockRDD[String](sc, Array(blockId))
- val numPartitions = blockRDD.partitions.size
- blockRDD.checkpoint()
- val result = blockRDD.collect()
- assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result)
- assert(blockRDD.dependencies != Nil)
- assert(blockRDD.partitions.length === numPartitions)
- assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList)
- assert(blockRDD.collect() === result)
- }
-
- test("ShuffledRDD") {
- testCheckpointing(rdd => {
- // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
- })
- }
-
- test("UnionRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 1)
-
- // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
- // Current implementation of UnionRDD has transient reference to parent RDDs,
- // so only the partitions will reduce in serialized size, not the RDD.
- testCheckpointing(_.union(otherRDD), false, true)
- testParentCheckpointing(_.union(otherRDD), false, true)
- }
-
- test("CartesianRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 1)
- testCheckpointing(new CartesianRDD(sc, _, otherRDD))
-
- // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
-
- // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
- // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
- // Note that this test is very specific to the current implementation of CartesianRDD.
- val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- ones.checkpoint() // checkpoint that MappedRDD
- val cartesian = new CartesianRDD(sc, ones, ones)
- val splitBeforeCheckpoint =
- serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
- cartesian.count() // do the checkpointing
- val splitAfterCheckpoint =
- serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
- assert(
- (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
- (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
- "CartesianRDD.parents not updated after parent RDD checkpointed"
- )
- }
-
- test("CoalescedRDD") {
- testCheckpointing(_.coalesce(2))
-
- // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(_.coalesce(2), true, false)
-
- // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
- // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
- // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
- val ones = sc.makeRDD(1 to 100, 10).map(x => x)
- ones.checkpoint() // checkpoint that MappedRDD
- val coalesced = new CoalescedRDD(ones, 2)
- val splitBeforeCheckpoint =
- serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
- coalesced.count() // do the checkpointing
- val splitAfterCheckpoint =
- serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
- assert(
- splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
- "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
- )
- }
-
- test("CoGroupedRDD") {
- val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
- testCheckpointing(rdd => {
- CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
- }, false, true)
-
- val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
- testParentCheckpointing(rdd => {
- CheckpointSuite.cogroup(
- longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
- }, false, true)
- }
-
- test("ZippedRDD") {
- testCheckpointing(
- rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
- // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
- // so only the RDD will reduce in serialized size, not the partitions.
- testParentCheckpointing(
- rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
- }
-
- test("CheckpointRDD with zero partitions") {
- val rdd = new BlockRDD[Int](sc, Array[String]())
- assert(rdd.partitions.size === 0)
- assert(rdd.isCheckpointed === false)
- rdd.checkpoint()
- assert(rdd.count() === 0)
- assert(rdd.isCheckpointed === true)
- assert(rdd.partitions.size === 0)
- }
-
- /**
- * Test checkpointing of the final RDD generated by the given operation. By default,
- * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
- * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
- * not, but this is not done by default as usually the partitions do not refer to any RDD and
- * therefore never store the lineage.
- */
- def testCheckpointing[U: ClassManifest](
- op: (RDD[Int]) => RDD[U],
- testRDDSize: Boolean = true,
- testRDDPartitionSize: Boolean = false
- ) {
- // Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD()
- val operatedRDD = op(baseRDD)
- val parentRDD = operatedRDD.dependencies.headOption.orNull
- val rddType = operatedRDD.getClass.getSimpleName
- val numPartitions = operatedRDD.partitions.length
-
- // Find serialized sizes before and after the checkpoint
- val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
- operatedRDD.checkpoint()
- val result = operatedRDD.collect()
- val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
-
- // Test whether the checkpoint file has been created
- assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
-
- // Test whether dependencies have been changed from its earlier parent RDD
- assert(operatedRDD.dependencies.head.rdd != parentRDD)
-
- // Test whether the partitions have been changed to the new Hadoop partitions
- assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
-
- // Test whether the number of partitions is same as before
- assert(operatedRDD.partitions.length === numPartitions)
-
- // Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
-
- // Test whether serialized size of the RDD has reduced. If the RDD
- // does not have any dependency to another RDD (e.g., ParallelCollection,
- // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
- if (testRDDSize) {
- logInfo("Size of " + rddType +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
- assert(
- rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
- "Size of " + rddType + " did not reduce after checkpointing " +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
- )
- }
-
- // Test whether serialized size of the partitions has reduced. If the partitions
- // do not have any non-transient reference to another RDD or another RDD's partitions, it
- // does not refer to a lineage and therefore may not reduce in size after checkpointing.
- // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
- // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
- // replaced with the HadooPartitions of the checkpointed RDD.
- if (testRDDPartitionSize) {
- logInfo("Size of " + rddType + " partitions "
- + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
- assert(
- splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " partitions did not reduce after checkpointing " +
- "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
- )
- }
- }
-
- /**
- * Test whether checkpointing of the parent of the generated RDD also
- * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
- * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
- * this RDD will remember the partitions and therefore potentially the whole lineage.
- */
- def testParentCheckpointing[U: ClassManifest](
- op: (RDD[Int]) => RDD[U],
- testRDDSize: Boolean,
- testRDDPartitionSize: Boolean
- ) {
- // Generate the final RDD using given RDD operation
- val baseRDD = generateLongLineageRDD()
- val operatedRDD = op(baseRDD)
- val parentRDD = operatedRDD.dependencies.head.rdd
- val rddType = operatedRDD.getClass.getSimpleName
- val parentRDDType = parentRDD.getClass.getSimpleName
-
- // Get the partitions and dependencies of the parent in case they're lazily computed
- parentRDD.dependencies
- parentRDD.partitions
-
- // Find serialized sizes before and after the checkpoint
- val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
- parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one
- val result = operatedRDD.collect()
- val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
-
- // Test whether the data in the checkpointed RDD is same as original
- assert(operatedRDD.collect() === result)
-
- // Test whether serialized size of the RDD has reduced because of its parent being
- // checkpointed. If this RDD or its parent RDD do not have any dependency
- // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
- // not reduce in size after checkpointing.
- if (testRDDSize) {
- assert(
- rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
- "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
- "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
- )
- }
-
- // Test whether serialized size of the partitions has reduced because of its parent being
- // checkpointed. If the partitions do not have any non-transient reference to another RDD
- // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
- // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
- // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
- // partitions must have changed after checkpointing.
- if (testRDDPartitionSize) {
- assert(
- splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
- "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
- )
- }
-
- }
-
- /**
- * Generate an RDD with a long lineage of one-to-one dependencies.
- */
- def generateLongLineageRDD(): RDD[Int] = {
- var rdd = sc.makeRDD(1 to 100, 4)
- for (i <- 1 to 50) {
- rdd = rdd.map(x => x + 1)
- }
- rdd
- }
-
- /**
- * Generate an RDD with a long lineage specifically for CoGroupedRDD.
- * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
- * and narrow dependency with this RDD. This method generate such an RDD by a sequence
- * of cogroups and mapValues which creates a long lineage of narrow dependencies.
- */
- def generateLongLineageRDDForCoGroupedRDD() = {
- val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
-
- def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
-
- var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
- for(i <- 1 to 10) {
- cogrouped = cogrouped.mapValues(add).cogroup(ones)
- }
- cogrouped.mapValues(add)
- }
-
- /**
- * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks
- * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
- */
- def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
- (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
- Utils.serialize(rdd.partitions).length)
- }
-
- /**
- * Serialize and deserialize an object. This is useful to verify the objects
- * contents after deserialization (e.g., the contents of an RDD split after
- * it is sent to a slave along with a task)
- */
- def serializeDeserialize[T](obj: T): T = {
- val bytes = Utils.serialize(obj)
- Utils.deserialize[T](bytes)
- }
-}
-
-
-object CheckpointSuite {
- // This is a custom cogroup function that does not use mapValues like
- // the PairRDDFunctions.cogroup()
- def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
- //println("First = " + first + ", second = " + second)
- new CoGroupedRDD[K](
- Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
- part
- ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ClosureCleanerSuite.scala b/core/src/test/scala/spark/ClosureCleanerSuite.scala
deleted file mode 100644
index 7d2831e..0000000
--- a/core/src/test/scala/spark/ClosureCleanerSuite.scala
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io.NotSerializableException
-
-import org.scalatest.FunSuite
-import spark.LocalSparkContext._
-import SparkContext._
-
-class ClosureCleanerSuite extends FunSuite {
- test("closures inside an object") {
- assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
- }
-
- test("closures inside a class") {
- val obj = new TestClass
- assert(obj.run() === 30) // 6 + 7 + 8 + 9
- }
-
- test("closures inside a class with no default constructor") {
- val obj = new TestClassWithoutDefaultConstructor(5)
- assert(obj.run() === 30) // 6 + 7 + 8 + 9
- }
-
- test("closures that don't use fields of the outer class") {
- val obj = new TestClassWithoutFieldAccess
- assert(obj.run() === 30) // 6 + 7 + 8 + 9
- }
-
- test("nested closures inside an object") {
- assert(TestObjectWithNesting.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
- }
-
- test("nested closures inside a class") {
- val obj = new TestClassWithNesting(1)
- assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
- }
-}
-
-// A non-serializable class we create in closures to make sure that we aren't
-// keeping references to unneeded variables from our outer closures.
-class NonSerializable {}
-
-object TestObject {
- def run(): Int = {
- var nonSer = new NonSerializable
- var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- nums.map(_ + x).reduce(_ + _)
- }
- }
-}
-
-class TestClass extends Serializable {
- var x = 5
-
- def getX = x
-
- def run(): Int = {
- var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- nums.map(_ + getX).reduce(_ + _)
- }
- }
-}
-
-class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
- def getX = x
-
- def run(): Int = {
- var nonSer = new NonSerializable
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- nums.map(_ + getX).reduce(_ + _)
- }
- }
-}
-
-// This class is not serializable, but we aren't using any of its fields in our
-// closures, so they won't have a $outer pointing to it and should still work.
-class TestClassWithoutFieldAccess {
- var nonSer = new NonSerializable
-
- def run(): Int = {
- var nonSer2 = new NonSerializable
- var x = 5
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- nums.map(_ + x).reduce(_ + _)
- }
- }
-}
-
-
-object TestObjectWithNesting {
- def run(): Int = {
- var nonSer = new NonSerializable
- var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- var y = 1
- for (i <- 1 to 4) {
- var nonSer2 = new NonSerializable
- var x = i
- answer += nums.map(_ + x + y).reduce(_ + _)
- }
- answer
- }
- }
-}
-
-class TestClassWithNesting(val y: Int) extends Serializable {
- def getY = y
-
- def run(): Int = {
- var nonSer = new NonSerializable
- var answer = 0
- return withSpark(new SparkContext("local", "test")) { sc =>
- val nums = sc.parallelize(Array(1, 2, 3, 4))
- for (i <- 1 to 4) {
- var nonSer2 = new NonSerializable
- var x = i
- answer += nums.map(_ + x + getY).reduce(_ + _)
- }
- answer
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
deleted file mode 100644
index e11efe4..0000000
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import network.ConnectionManagerId
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.prop.Checkers
-import org.scalatest.time.{Span, Millis}
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-import org.eclipse.jetty.server.{Server, Request, Handler}
-
-import com.google.common.io.Files
-
-import scala.collection.mutable.ArrayBuffer
-
-import SparkContext._
-import storage.{GetBlock, BlockManagerWorker, StorageLevel}
-import ui.JettyUtils
-
-
-class NotSerializableClass
-class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
-
-
-class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
- with LocalSparkContext {
-
- val clusterUrl = "local-cluster[2,1,512]"
-
- after {
- System.clearProperty("spark.reducer.maxMbInFlight")
- System.clearProperty("spark.storage.memoryFraction")
- }
-
- test("task throws not serializable exception") {
- // Ensures that executors do not crash when an exn is not serializable. If executors crash,
- // this test will hang. Correct behavior is that executors don't crash but fail tasks
- // and the scheduler throws a SparkException.
-
- // numSlaves must be less than numPartitions
- val numSlaves = 3
- val numPartitions = 10
-
- sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
- val data = sc.parallelize(1 to 100, numPartitions).
- map(x => throw new NotSerializableExn(new NotSerializableClass))
- intercept[SparkException] {
- data.count()
- }
- resetSparkContext()
- }
-
- test("local-cluster format") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- assert(sc.parallelize(1 to 2, 2).count() == 2)
- resetSparkContext()
- sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
- assert(sc.parallelize(1 to 2, 2).count() == 2)
- resetSparkContext()
- sc = new SparkContext("local-cluster[2, 1, 512]", "test")
- assert(sc.parallelize(1 to 2, 2).count() == 2)
- resetSparkContext()
- sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
- assert(sc.parallelize(1 to 2, 2).count() == 2)
- resetSparkContext()
- }
-
- test("simple groupByKey") {
- sc = new SparkContext(clusterUrl, "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
- val groups = pairs.groupByKey(5).collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- }
-
- test("groupByKey where map output sizes exceed maxMbInFlight") {
- System.setProperty("spark.reducer.maxMbInFlight", "1")
- sc = new SparkContext(clusterUrl, "test")
- // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
- // file should be about 2.5 MB
- val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
- val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
- assert(groups.length === 16)
- assert(groups.map(_._2).sum === 2000)
- // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
- }
-
- test("accumulators") {
- sc = new SparkContext(clusterUrl, "test")
- val accum = sc.accumulator(0)
- sc.parallelize(1 to 10, 10).foreach(x => accum += x)
- assert(accum.value === 55)
- }
-
- test("broadcast variables") {
- sc = new SparkContext(clusterUrl, "test")
- val array = new Array[Int](100)
- val bv = sc.broadcast(array)
- array(2) = 3 // Change the array -- this should not be seen on workers
- val rdd = sc.parallelize(1 to 10, 10)
- val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
- assert(sum === 0)
- }
-
- test("repeatedly failing task") {
- sc = new SparkContext(clusterUrl, "test")
- val accum = sc.accumulator(0)
- val thrown = intercept[SparkException] {
- sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
- }
- assert(thrown.getClass === classOf[SparkException])
- assert(thrown.getMessage.contains("more than 4 times"))
- }
-
- test("caching") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).cache()
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching on disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- }
-
- test("caching in memory and disk, serialized, replicated") {
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
-
- assert(data.count() === 1000)
- assert(data.count() === 1000)
- assert(data.count() === 1000)
-
- // Get all the locations of the first partition and try to fetch the partitions
- // from those locations.
- val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
- val blockId = blockIds(0)
- val blockManager = SparkEnv.get.blockManager
- blockManager.master.getLocations(blockId).foreach(id => {
- val bytes = BlockManagerWorker.syncGetBlock(
- GetBlock(blockId), ConnectionManagerId(id.host, id.port))
- val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
- assert(deserialized === (1 to 100).toList)
- })
- }
-
- test("compute without caching when no partitions fit in memory") {
- System.setProperty("spark.storage.memoryFraction", "0.0001")
- sc = new SparkContext(clusterUrl, "test")
- // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
- // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
- val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
- assert(data.count() === 4000000)
- assert(data.count() === 4000000)
- assert(data.count() === 4000000)
- System.clearProperty("spark.storage.memoryFraction")
- }
-
- test("compute when only some partitions fit in memory") {
- System.setProperty("spark.storage.memoryFraction", "0.01")
- sc = new SparkContext(clusterUrl, "test")
- // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
- // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
- // to make sure that *some* of them do fit though
- val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
- assert(data.count() === 4000000)
- assert(data.count() === 4000000)
- assert(data.count() === 4000000)
- System.clearProperty("spark.storage.memoryFraction")
- }
-
- test("passing environment variables to cluster") {
- sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
- val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
- assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
- }
-
- test("recover from node failures") {
- import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
- DistributedSuite.amMaster = true
- sc = new SparkContext(clusterUrl, "test")
- val data = sc.parallelize(Seq(true, true), 2)
- assert(data.count === 2) // force executors to start
- assert(data.map(markNodeIfIdentity).collect.size === 2)
- assert(data.map(failOnMarkedIdentity).collect.size === 2)
- }
-
- test("recover from repeated node failures during shuffle-map") {
- import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
- DistributedSuite.amMaster = true
- sc = new SparkContext(clusterUrl, "test")
- for (i <- 1 to 3) {
- val data = sc.parallelize(Seq(true, false), 2)
- assert(data.count === 2)
- assert(data.map(markNodeIfIdentity).collect.size === 2)
- assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
- }
- }
-
- test("recover from repeated node failures during shuffle-reduce") {
- import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
- DistributedSuite.amMaster = true
- sc = new SparkContext(clusterUrl, "test")
- for (i <- 1 to 3) {
- val data = sc.parallelize(Seq(true, true), 2)
- assert(data.count === 2)
- assert(data.map(markNodeIfIdentity).collect.size === 2)
- // This relies on mergeCombiners being used to perform the actual reduce for this
- // test to actually be testing what it claims.
- val grouped = data.map(x => x -> x).combineByKey(
- x => x,
- (x: Boolean, y: Boolean) => x,
- (x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
- )
- assert(grouped.collect.size === 1)
- }
- }
-
- test("recover from node failures with replication") {
- import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
- DistributedSuite.amMaster = true
- // Using more than two nodes so we don't have a symmetric communication pattern and might
- // cache a partially correct list of peers.
- sc = new SparkContext("local-cluster[3,1,512]", "test")
- for (i <- 1 to 3) {
- val data = sc.parallelize(Seq(true, false, false, false), 4)
- data.persist(StorageLevel.MEMORY_ONLY_2)
-
- assert(data.count === 4)
- assert(data.map(markNodeIfIdentity).collect.size === 4)
- assert(data.map(failOnMarkedIdentity).collect.size === 4)
-
- // Create a new replicated RDD to make sure that cached peer information doesn't cause
- // problems.
- val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
- assert(data2.count === 2)
- }
- }
-
- test("unpersist RDDs") {
- DistributedSuite.amMaster = true
- sc = new SparkContext("local-cluster[3,1,512]", "test")
- val data = sc.parallelize(Seq(true, false, false, false), 4)
- data.persist(StorageLevel.MEMORY_ONLY_2)
- data.count
- assert(sc.persistentRdds.isEmpty === false)
- data.unpersist()
- assert(sc.persistentRdds.isEmpty === true)
-
- failAfter(Span(3000, Millis)) {
- try {
- while (! sc.getRDDStorageInfo.isEmpty) {
- Thread.sleep(200)
- }
- } catch {
- case _ => { Thread.sleep(10) }
- // Do nothing. We might see exceptions because block manager
- // is racing this thread to remove entries from the driver.
- }
- }
- }
-
- test("job should fail if TaskResult exceeds Akka frame size") {
- // We must use local-cluster mode since results are returned differently
- // when running under LocalScheduler:
- sc = new SparkContext("local-cluster[1,1,512]", "test")
- val akkaFrameSize =
- sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
- val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
- val exception = intercept[SparkException] {
- rdd.reduce((x, y) => x)
- }
- exception.getMessage should endWith("result exceeded Akka frame size")
- }
-}
-
-object DistributedSuite {
- // Indicates whether this JVM is marked for failure.
- var mark = false
-
- // Set by test to remember if we are in the driver program so we can assert
- // that we are not.
- var amMaster = false
-
- // Act like an identity function, but if the argument is true, set mark to true.
- def markNodeIfIdentity(item: Boolean): Boolean = {
- if (item) {
- assert(!amMaster)
- mark = true
- }
- item
- }
-
- // Act like an identity function, but if mark was set to true previously, fail,
- // crashing the entire JVM.
- def failOnMarkedIdentity(item: Boolean): Boolean = {
- if (mark) {
- System.exit(42)
- }
- item
- }
-}