You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:56 UTC

[12/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
new file mode 100644
index 0000000..4434f3b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import collection.mutable
+import java.util.Random
+import scala.math.exp
+import scala.math.signum
+import org.apache.spark.SparkContext._
+
+class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
+
+  test ("basic accumulation"){
+    sc = new SparkContext("local", "test")
+    val acc : Accumulator[Int] = sc.accumulator(0)
+
+    val d = sc.parallelize(1 to 20)
+    d.foreach{x => acc += x}
+    acc.value should be (210)
+
+
+    val longAcc = sc.accumulator(0l)
+    val maxInt = Integer.MAX_VALUE.toLong
+    d.foreach{x => longAcc += maxInt + x}
+    longAcc.value should be (210l + maxInt * 20)
+  }
+
+  test ("value not assignable from tasks") {
+    sc = new SparkContext("local", "test")
+    val acc : Accumulator[Int] = sc.accumulator(0)
+
+    val d = sc.parallelize(1 to 20)
+    evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
+  }
+
+  test ("add value to collection accumulators") {
+    import SetAccum._
+    val maxI = 1000
+    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+      sc = new SparkContext("local[" + nThreads + "]", "test")
+      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+      val d = sc.parallelize(1 to maxI)
+      d.foreach {
+        x => acc += x
+      }
+      val v = acc.value.asInstanceOf[mutable.Set[Int]]
+      for (i <- 1 to maxI) {
+        v should contain(i)
+      }
+      resetSparkContext()
+    }
+  }
+
+  implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
+    def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
+      t1 ++= t2
+      t1
+    }
+    def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
+      t1 += t2
+      t1
+    }
+    def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
+      new mutable.HashSet[Any]()
+    }
+  }
+
+  test ("value not readable in tasks") {
+    import SetAccum._
+    val maxI = 1000
+    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+      sc = new SparkContext("local[" + nThreads + "]", "test")
+      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+      val d = sc.parallelize(1 to maxI)
+      evaluating {
+        d.foreach {
+          x => acc.value += x
+        }
+      } should produce [SparkException]
+      resetSparkContext()
+    }
+  }
+
+  test ("collection accumulators") {
+    val maxI = 1000
+    for (nThreads <- List(1, 10)) {
+      // test single & multi-threaded
+      sc = new SparkContext("local[" + nThreads + "]", "test")
+      val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
+      val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
+      val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
+      val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
+      d.foreach {
+        x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
+      }
+
+      // Note that this is typed correctly -- no casts necessary
+      setAcc.value.size should be (maxI)
+      bufferAcc.value.size should be (2 * maxI)
+      mapAcc.value.size should be (maxI)
+      for (i <- 1 to maxI) {
+        setAcc.value should contain(i)
+        bufferAcc.value should contain(i)
+        mapAcc.value should contain (i -> i.toString)
+      }
+      resetSparkContext()
+    }
+  }
+
+  test ("localValue readable in tasks") {
+    import SetAccum._
+    val maxI = 1000
+    for (nThreads <- List(1, 10)) { //test single & multi-threaded
+      sc = new SparkContext("local[" + nThreads + "]", "test")
+      val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+      val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
+      val d = sc.parallelize(groupedInts)
+      d.foreach {
+        x => acc.localValue ++= x
+      }
+      acc.value should be ( (0 to maxI).toSet)
+      resetSparkContext()
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
new file mode 100644
index 0000000..b3a53d9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+class BroadcastSuite extends FunSuite with LocalSparkContext {
+  
+  test("basic broadcast") {
+    sc = new SparkContext("local", "test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
+    assert(results.collect.toSet === Set((1, 10), (2, 10)))
+  }
+
+  test("broadcast variables accessed in multiple threads") {
+    sc = new SparkContext("local[10]", "test")
+    val list = List(1, 2, 3, 4)
+    val listBroadcast = sc.broadcast(list)
+    val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
+    assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
new file mode 100644
index 0000000..23b14f4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import java.io.File
+import org.apache.spark.rdd._
+import org.apache.spark.SparkContext._
+import storage.StorageLevel
+
+class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
+  initLogging()
+
+  var checkpointDir: File = _
+  val partitioner = new HashPartitioner(2)
+
+  override def beforeEach() {
+    super.beforeEach()
+    checkpointDir = File.createTempFile("temp", "")
+    checkpointDir.delete()
+    sc = new SparkContext("local", "test")
+    sc.setCheckpointDir(checkpointDir.toString)
+  }
+
+  override def afterEach() {
+    super.afterEach()
+    if (checkpointDir != null) {
+      checkpointDir.delete()
+    }
+  }
+
+  test("basic checkpointing") {
+    val parCollection = sc.makeRDD(1 to 4)
+    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
+    flatMappedRDD.checkpoint()
+    assert(flatMappedRDD.dependencies.head.rdd == parCollection)
+    val result = flatMappedRDD.collect()
+    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
+    assert(flatMappedRDD.collect() === result)
+  }
+
+  test("RDDs with one-to-one dependencies") {
+    testCheckpointing(_.map(x => x.toString))
+    testCheckpointing(_.flatMap(x => 1 to x))
+    testCheckpointing(_.filter(_ % 2 == 0))
+    testCheckpointing(_.sample(false, 0.5, 0))
+    testCheckpointing(_.glom())
+    testCheckpointing(_.mapPartitions(_.map(_.toString)))
+    testCheckpointing(r => new MapPartitionsWithIndexRDD(r,
+      (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false ))
+    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+    testCheckpointing(_.pipe(Seq("cat")))
+  }
+
+  test("ParallelCollection") {
+    val parCollection = sc.makeRDD(1 to 4, 2)
+    val numPartitions = parCollection.partitions.size
+    parCollection.checkpoint()
+    assert(parCollection.dependencies === Nil)
+    val result = parCollection.collect()
+    assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
+    assert(parCollection.dependencies != Nil)
+    assert(parCollection.partitions.length === numPartitions)
+    assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
+    assert(parCollection.collect() === result)
+  }
+
+  test("BlockRDD") {
+    val blockId = "id"
+    val blockManager = SparkEnv.get.blockManager
+    blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
+    val blockRDD = new BlockRDD[String](sc, Array(blockId))
+    val numPartitions = blockRDD.partitions.size
+    blockRDD.checkpoint()
+    val result = blockRDD.collect()
+    assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result)
+    assert(blockRDD.dependencies != Nil)
+    assert(blockRDD.partitions.length === numPartitions)
+    assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList)
+    assert(blockRDD.collect() === result)
+  }
+
+  test("ShuffledRDD") {
+    testCheckpointing(rdd => {
+      // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
+      new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
+    })
+  }
+
+  test("UnionRDD") {
+    def otherRDD = sc.makeRDD(1 to 10, 1)
+
+    // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
+    // Current implementation of UnionRDD has transient reference to parent RDDs,
+    // so only the partitions will reduce in serialized size, not the RDD.
+    testCheckpointing(_.union(otherRDD), false, true)
+    testParentCheckpointing(_.union(otherRDD), false, true)
+  }
+
+  test("CartesianRDD") {
+    def otherRDD = sc.makeRDD(1 to 10, 1)
+    testCheckpointing(new CartesianRDD(sc, _, otherRDD))
+
+    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+    // so only the RDD will reduce in serialized size, not the partitions.
+    testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+
+    // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
+    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+    // Note that this test is very specific to the current implementation of CartesianRDD.
+    val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+    ones.checkpoint() // checkpoint that MappedRDD
+    val cartesian = new CartesianRDD(sc, ones, ones)
+    val splitBeforeCheckpoint =
+      serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
+    cartesian.count() // do the checkpointing
+    val splitAfterCheckpoint =
+      serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
+    assert(
+      (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
+        (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
+      "CartesianRDD.parents not updated after parent RDD checkpointed"
+    )
+  }
+
+  test("CoalescedRDD") {
+    testCheckpointing(_.coalesce(2))
+
+    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+    // so only the RDD will reduce in serialized size, not the partitions.
+    testParentCheckpointing(_.coalesce(2), true, false)
+
+    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
+    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+    // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
+    val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+    ones.checkpoint() // checkpoint that MappedRDD
+    val coalesced = new CoalescedRDD(ones, 2)
+    val splitBeforeCheckpoint =
+      serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
+    coalesced.count() // do the checkpointing
+    val splitAfterCheckpoint =
+      serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
+    assert(
+      splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
+      "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
+    )
+  }
+
+  test("CoGroupedRDD") {
+    val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
+    testCheckpointing(rdd => {
+      CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
+    }, false, true)
+
+    val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
+    testParentCheckpointing(rdd => {
+      CheckpointSuite.cogroup(
+        longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
+    }, false, true)
+  }
+
+  test("ZippedRDD") {
+    testCheckpointing(
+      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+
+    // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
+    // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
+    // so only the RDD will reduce in serialized size, not the partitions.
+    testParentCheckpointing(
+      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+  }
+
+  test("CheckpointRDD with zero partitions") {
+    val rdd = new BlockRDD[Int](sc, Array[String]())
+    assert(rdd.partitions.size === 0)
+    assert(rdd.isCheckpointed === false)
+    rdd.checkpoint()
+    assert(rdd.count() === 0)
+    assert(rdd.isCheckpointed === true)
+    assert(rdd.partitions.size === 0)
+  }
+
+  /**
+   * Test checkpointing of the final RDD generated by the given operation. By default,
+   * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
+   * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
+   * not, but this is not done by default as usually the partitions do not refer to any RDD and
+   * therefore never store the lineage.
+   */
+  def testCheckpointing[U: ClassManifest](
+      op: (RDD[Int]) => RDD[U],
+      testRDDSize: Boolean = true,
+      testRDDPartitionSize: Boolean = false
+    ) {
+    // Generate the final RDD using given RDD operation
+    val baseRDD = generateLongLineageRDD()
+    val operatedRDD = op(baseRDD)
+    val parentRDD = operatedRDD.dependencies.headOption.orNull
+    val rddType = operatedRDD.getClass.getSimpleName
+    val numPartitions = operatedRDD.partitions.length
+
+    // Find serialized sizes before and after the checkpoint
+    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    operatedRDD.checkpoint()
+    val result = operatedRDD.collect()
+    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+    // Test whether the checkpoint file has been created
+    assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+
+    // Test whether dependencies have been changed from its earlier parent RDD
+    assert(operatedRDD.dependencies.head.rdd != parentRDD)
+
+    // Test whether the partitions have been changed to the new Hadoop partitions
+    assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
+
+    // Test whether the number of partitions is same as before
+    assert(operatedRDD.partitions.length === numPartitions)
+
+    // Test whether the data in the checkpointed RDD is same as original
+    assert(operatedRDD.collect() === result)
+
+    // Test whether serialized size of the RDD has reduced. If the RDD
+    // does not have any dependency to another RDD (e.g., ParallelCollection,
+    // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
+    if (testRDDSize) {
+      logInfo("Size of " + rddType +
+        "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+      assert(
+        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+        "Size of " + rddType + " did not reduce after checkpointing " +
+          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+      )
+    }
+
+    // Test whether serialized size of the partitions has reduced. If the partitions
+    // do not have any non-transient reference to another RDD or another RDD's partitions, it
+    // does not refer to a lineage and therefore may not reduce in size after checkpointing.
+    // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
+    // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
+    // replaced with the HadooPartitions of the checkpointed RDD.
+    if (testRDDPartitionSize) {
+      logInfo("Size of " + rddType + " partitions "
+        + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
+      assert(
+        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+        "Size of " + rddType + " partitions did not reduce after checkpointing " +
+          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+      )
+    }
+  }
+
+  /**
+   * Test whether checkpointing of the parent of the generated RDD also
+   * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
+   * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
+   * this RDD will remember the partitions and therefore potentially the whole lineage.
+   */
+  def testParentCheckpointing[U: ClassManifest](
+      op: (RDD[Int]) => RDD[U],
+      testRDDSize: Boolean,
+      testRDDPartitionSize: Boolean
+    ) {
+    // Generate the final RDD using given RDD operation
+    val baseRDD = generateLongLineageRDD()
+    val operatedRDD = op(baseRDD)
+    val parentRDD = operatedRDD.dependencies.head.rdd
+    val rddType = operatedRDD.getClass.getSimpleName
+    val parentRDDType = parentRDD.getClass.getSimpleName
+
+    // Get the partitions and dependencies of the parent in case they're lazily computed
+    parentRDD.dependencies
+    parentRDD.partitions
+
+    // Find serialized sizes before and after the checkpoint
+    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    parentRDD.checkpoint()  // checkpoint the parent RDD, not the generated one
+    val result = operatedRDD.collect()
+    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+    // Test whether the data in the checkpointed RDD is same as original
+    assert(operatedRDD.collect() === result)
+
+    // Test whether serialized size of the RDD has reduced because of its parent being
+    // checkpointed. If this RDD or its parent RDD do not have any dependency
+    // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
+    // not reduce in size after checkpointing.
+    if (testRDDSize) {
+      assert(
+        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+        "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
+          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+      )
+    }
+
+    // Test whether serialized size of the partitions has reduced because of its parent being
+    // checkpointed. If the partitions do not have any non-transient reference to another RDD
+    // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
+    // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
+    // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
+    // partitions must have changed after checkpointing.
+    if (testRDDPartitionSize) {
+      assert(
+        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+        "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
+          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+      )
+    }
+
+  }
+
+  /**
+   * Generate an RDD with a long lineage of one-to-one dependencies.
+   */
+  def generateLongLineageRDD(): RDD[Int] = {
+    var rdd = sc.makeRDD(1 to 100, 4)
+    for (i <- 1 to 50) {
+      rdd = rdd.map(x => x + 1)
+    }
+    rdd
+  }
+
+  /**
+   * Generate an RDD with a long lineage specifically for CoGroupedRDD.
+   * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
+   * and narrow dependency with this RDD. This method generate such an RDD by a sequence
+   * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+   */
+  def generateLongLineageRDDForCoGroupedRDD() = {
+    val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
+
+    def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+
+    var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
+    for(i <- 1 to 10) {
+      cogrouped = cogrouped.mapValues(add).cogroup(ones)
+    }
+    cogrouped.mapValues(add)
+  }
+
+  /**
+   * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks
+   * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
+   */
+  def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
+    (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
+     Utils.serialize(rdd.partitions).length)
+  }
+
+  /**
+   * Serialize and deserialize an object. This is useful to verify the objects
+   * contents after deserialization (e.g., the contents of an RDD split after
+   * it is sent to a slave along with a task)
+   */
+  def serializeDeserialize[T](obj: T): T = {
+    val bytes = Utils.serialize(obj)
+    Utils.deserialize[T](bytes)
+  }
+}
+
+
+object CheckpointSuite {
+  // This is a custom cogroup function that does not use mapValues like
+  // the PairRDDFunctions.cogroup()
+  def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
+    //println("First = " + first + ", second = " + second)
+    new CoGroupedRDD[K](
+      Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
+      part
+    ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
new file mode 100644
index 0000000..8494899
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.NotSerializableException
+
+import org.scalatest.FunSuite
+import org.apache.spark.LocalSparkContext._
+import SparkContext._
+
+class ClosureCleanerSuite extends FunSuite {
+  test("closures inside an object") {
+    assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
+  }
+
+  test("closures inside a class") {
+    val obj = new TestClass
+    assert(obj.run() === 30) // 6 + 7 + 8 + 9
+  }
+
+  test("closures inside a class with no default constructor") {
+    val obj = new TestClassWithoutDefaultConstructor(5)
+    assert(obj.run() === 30) // 6 + 7 + 8 + 9
+  }
+
+  test("closures that don't use fields of the outer class") {
+    val obj = new TestClassWithoutFieldAccess
+    assert(obj.run() === 30) // 6 + 7 + 8 + 9
+  }
+
+  test("nested closures inside an object") {
+    assert(TestObjectWithNesting.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
+  }
+
+  test("nested closures inside a class") {
+    val obj = new TestClassWithNesting(1)
+    assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
+  }
+}
+
+// A non-serializable class we create in closures to make sure that we aren't
+// keeping references to unneeded variables from our outer closures.
+class NonSerializable {}
+
+object TestObject {
+  def run(): Int = {
+    var nonSer = new NonSerializable
+    var x = 5
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      nums.map(_ + x).reduce(_ + _)
+    }
+  }
+}
+
+class TestClass extends Serializable {
+  var x = 5
+  
+  def getX = x
+
+  def run(): Int = {
+    var nonSer = new NonSerializable
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      nums.map(_ + getX).reduce(_ + _)
+    }
+  }
+}
+
+class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
+  def getX = x
+
+  def run(): Int = {
+    var nonSer = new NonSerializable
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      nums.map(_ + getX).reduce(_ + _)
+    }
+  }
+}
+
+// This class is not serializable, but we aren't using any of its fields in our
+// closures, so they won't have a $outer pointing to it and should still work.
+class TestClassWithoutFieldAccess {
+  var nonSer = new NonSerializable
+
+  def run(): Int = {
+    var nonSer2 = new NonSerializable
+    var x = 5
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      nums.map(_ + x).reduce(_ + _)
+    }
+  }
+}
+
+
+object TestObjectWithNesting {
+  def run(): Int = {
+    var nonSer = new NonSerializable
+    var answer = 0
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      var y = 1
+      for (i <- 1 to 4) {
+        var nonSer2 = new NonSerializable
+        var x = i
+        answer += nums.map(_ + x + y).reduce(_ + _)
+      }
+      answer
+    }
+  }
+}
+
+class TestClassWithNesting(val y: Int) extends Serializable {
+  def getY = y
+
+  def run(): Int = {
+    var nonSer = new NonSerializable
+    var answer = 0
+    return withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      for (i <- 1 to 4) {
+        var nonSer2 = new NonSerializable
+        var x = i
+        answer += nums.map(_ + x + getY).reduce(_ + _)
+      }
+      answer
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
new file mode 100644
index 0000000..7a856d4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import network.ConnectionManagerId
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.prop.Checkers
+import org.scalatest.time.{Span, Millis}
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+import org.eclipse.jetty.server.{Server, Request, Handler}
+
+import com.google.common.io.Files
+
+import scala.collection.mutable.ArrayBuffer
+
+import SparkContext._
+import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+import ui.JettyUtils
+
+
+class NotSerializableClass
+class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
+
+
+class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
+  with LocalSparkContext {
+
+  val clusterUrl = "local-cluster[2,1,512]"
+
+  after {
+    System.clearProperty("spark.reducer.maxMbInFlight")
+    System.clearProperty("spark.storage.memoryFraction")
+  }
+
+  test("task throws not serializable exception") {
+    // Ensures that executors do not crash when an exn is not serializable. If executors crash,
+    // this test will hang. Correct behavior is that executors don't crash but fail tasks
+    // and the scheduler throws a SparkException.
+
+    // numSlaves must be less than numPartitions
+    val numSlaves = 3
+    val numPartitions = 10
+
+    sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+    val data = sc.parallelize(1 to 100, numPartitions).
+      map(x => throw new NotSerializableExn(new NotSerializableClass))
+    intercept[SparkException] {
+      data.count()
+    }
+    resetSparkContext()
+  }
+
+  test("local-cluster format") {
+    sc = new SparkContext("local-cluster[2,1,512]", "test")
+    assert(sc.parallelize(1 to 2, 2).count() == 2)
+    resetSparkContext()
+    sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
+    assert(sc.parallelize(1 to 2, 2).count() == 2)
+    resetSparkContext()
+    sc = new SparkContext("local-cluster[2, 1, 512]", "test")
+    assert(sc.parallelize(1 to 2, 2).count() == 2)
+    resetSparkContext()
+    sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
+    assert(sc.parallelize(1 to 2, 2).count() == 2)
+    resetSparkContext()
+  }
+
+  test("simple groupByKey") {
+    sc = new SparkContext(clusterUrl, "test")
+    val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
+    val groups = pairs.groupByKey(5).collect()
+    assert(groups.size === 2)
+    val valuesFor1 = groups.find(_._1 == 1).get._2
+    assert(valuesFor1.toList.sorted === List(1, 2, 3))
+    val valuesFor2 = groups.find(_._1 == 2).get._2
+    assert(valuesFor2.toList.sorted === List(1))
+  }
+
+  test("groupByKey where map output sizes exceed maxMbInFlight") {
+    System.setProperty("spark.reducer.maxMbInFlight", "1")
+    sc = new SparkContext(clusterUrl, "test")
+    // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
+    // file should be about 2.5 MB
+    val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
+    val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
+    assert(groups.length === 16)
+    assert(groups.map(_._2).sum === 2000)
+    // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
+  }
+
+  test("accumulators") {
+    sc = new SparkContext(clusterUrl, "test")
+    val accum = sc.accumulator(0)
+    sc.parallelize(1 to 10, 10).foreach(x => accum += x)
+    assert(accum.value === 55)
+  }
+
+  test("broadcast variables") {
+    sc = new SparkContext(clusterUrl, "test")
+    val array = new Array[Int](100)
+    val bv = sc.broadcast(array)
+    array(2) = 3     // Change the array -- this should not be seen on workers
+    val rdd = sc.parallelize(1 to 10, 10)
+    val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
+    assert(sum === 0)
+  }
+
+  test("repeatedly failing task") {
+    sc = new SparkContext(clusterUrl, "test")
+    val accum = sc.accumulator(0)
+    val thrown = intercept[SparkException] {
+      sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getMessage.contains("more than 4 times"))
+  }
+
+  test("caching") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).cache()
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching on disk") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching in memory, replicated") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching in memory, serialized, replicated") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching on disk, replicated") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching in memory and disk, replicated") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+  }
+
+  test("caching in memory and disk, serialized, replicated") {
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
+
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+    assert(data.count() === 1000)
+
+    // Get all the locations of the first partition and try to fetch the partitions
+    // from those locations.
+    val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
+    val blockId = blockIds(0)
+    val blockManager = SparkEnv.get.blockManager
+    blockManager.master.getLocations(blockId).foreach(id => {
+      val bytes = BlockManagerWorker.syncGetBlock(
+        GetBlock(blockId), ConnectionManagerId(id.host, id.port))
+      val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
+      assert(deserialized === (1 to 100).toList)
+    })
+  }
+
+  test("compute without caching when no partitions fit in memory") {
+    System.setProperty("spark.storage.memoryFraction", "0.0001")
+    sc = new SparkContext(clusterUrl, "test")
+    // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+    // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
+    val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+    System.clearProperty("spark.storage.memoryFraction")
+  }
+
+  test("compute when only some partitions fit in memory") {
+    System.setProperty("spark.storage.memoryFraction", "0.01")
+    sc = new SparkContext(clusterUrl, "test")
+    // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+    // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
+    // to make sure that *some* of them do fit though
+    val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+    assert(data.count() === 4000000)
+    System.clearProperty("spark.storage.memoryFraction")
+  }
+
+  test("passing environment variables to cluster") {
+    sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
+    val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
+    assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
+  }
+
+  test("recover from node failures") {
+    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+    DistributedSuite.amMaster = true
+    sc = new SparkContext(clusterUrl, "test")
+    val data = sc.parallelize(Seq(true, true), 2)
+    assert(data.count === 2) // force executors to start
+    assert(data.map(markNodeIfIdentity).collect.size === 2)
+    assert(data.map(failOnMarkedIdentity).collect.size === 2)
+  }
+
+  test("recover from repeated node failures during shuffle-map") {
+    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+    DistributedSuite.amMaster = true
+    sc = new SparkContext(clusterUrl, "test")
+    for (i <- 1 to 3) {
+      val data = sc.parallelize(Seq(true, false), 2)
+      assert(data.count === 2)
+      assert(data.map(markNodeIfIdentity).collect.size === 2)
+      assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
+    }
+  }
+
+  test("recover from repeated node failures during shuffle-reduce") {
+    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+    DistributedSuite.amMaster = true
+    sc = new SparkContext(clusterUrl, "test")
+    for (i <- 1 to 3) {
+      val data = sc.parallelize(Seq(true, true), 2)
+      assert(data.count === 2)
+      assert(data.map(markNodeIfIdentity).collect.size === 2)
+      // This relies on mergeCombiners being used to perform the actual reduce for this
+      // test to actually be testing what it claims.
+      val grouped = data.map(x => x -> x).combineByKey(
+                      x => x,
+                      (x: Boolean, y: Boolean) => x,
+                      (x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
+                    )
+      assert(grouped.collect.size === 1)
+    }
+  }
+
+  test("recover from node failures with replication") {
+    import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+    DistributedSuite.amMaster = true
+    // Using more than two nodes so we don't have a symmetric communication pattern and might
+    // cache a partially correct list of peers.
+    sc = new SparkContext("local-cluster[3,1,512]", "test")
+    for (i <- 1 to 3) {
+      val data = sc.parallelize(Seq(true, false, false, false), 4)
+      data.persist(StorageLevel.MEMORY_ONLY_2)
+
+      assert(data.count === 4)
+      assert(data.map(markNodeIfIdentity).collect.size === 4)
+      assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+      // Create a new replicated RDD to make sure that cached peer information doesn't cause
+      // problems.
+      val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+      assert(data2.count === 2)
+    }
+  }
+
+  test("unpersist RDDs") {
+    DistributedSuite.amMaster = true
+    sc = new SparkContext("local-cluster[3,1,512]", "test")
+    val data = sc.parallelize(Seq(true, false, false, false), 4)
+    data.persist(StorageLevel.MEMORY_ONLY_2)
+    data.count
+    assert(sc.persistentRdds.isEmpty === false)
+    data.unpersist()
+    assert(sc.persistentRdds.isEmpty === true)
+
+    failAfter(Span(3000, Millis)) {
+      try {
+        while (! sc.getRDDStorageInfo.isEmpty) {
+          Thread.sleep(200)
+        }
+      } catch {
+        case _ => { Thread.sleep(10) }
+          // Do nothing. We might see exceptions because block manager
+          // is racing this thread to remove entries from the driver.
+      }
+    }
+  }
+
+  test("job should fail if TaskResult exceeds Akka frame size") {
+    // We must use local-cluster mode since results are returned differently
+    // when running under LocalScheduler:
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val akkaFrameSize =
+      sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+    val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+    val exception = intercept[SparkException] {
+      rdd.reduce((x, y) => x)
+    }
+    exception.getMessage should endWith("result exceeded Akka frame size")
+  }
+}
+
+object DistributedSuite {
+  // Indicates whether this JVM is marked for failure.
+  var mark = false
+
+  // Set by test to remember if we are in the driver program so we can assert
+  // that we are not.
+  var amMaster = false
+
+  // Act like an identity function, but if the argument is true, set mark to true.
+  def markNodeIfIdentity(item: Boolean): Boolean = {
+    if (item) {
+      assert(!amMaster)
+      mark = true
+    }
+    item
+  }
+
+  // Act like an identity function, but if mark was set to true previously, fail,
+  // crashing the entire JVM.
+  def failOnMarkedIdentity(item: Boolean): Boolean = {
+    if (mark) {
+      System.exit(42)
+    }
+    item
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
new file mode 100644
index 0000000..b08aad1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.prop.TableDrivenPropertyChecks._
+import org.scalatest.time.SpanSugar._
+
+class DriverSuite extends FunSuite with Timeouts {
+  test("driver should exit after finishing") {
+    assert(System.getenv("SPARK_HOME") != null)
+    // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
+    val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
+    forAll(masters) { (master: String) =>
+      failAfter(30 seconds) {
+        Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+          new File(System.getenv("SPARK_HOME")))
+      }
+    }
+  }
+}
+
+/**
+ * Program that creates a Spark driver but doesn't call SparkContext.stop() or
+ * Sys.exit() after finishing.
+ */
+object DriverWithoutCleanup {
+  def main(args: Array[String]) {
+    Logger.getRootLogger().setLevel(Level.WARN)
+    val sc = new SparkContext(args(0), "DriverWithoutCleanup")
+    sc.parallelize(1 to 100, 4).count()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
new file mode 100644
index 0000000..ee89a7a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import SparkContext._
+
+// Common state shared by FailureSuite-launched tasks. We use a global object
+// for this because any local variables used in the task closures will rightfully
+// be copied for each task, so there's no other way for them to share state.
+object FailureSuiteState {
+  var tasksRun = 0
+  var tasksFailed = 0
+
+  def clear() {
+    synchronized {
+      tasksRun = 0
+      tasksFailed = 0
+    }
+  }
+}
+
+class FailureSuite extends FunSuite with LocalSparkContext {
+
+  // Run a 3-task map job in which task 1 deterministically fails once, and check
+  // whether the job completes successfully and we ran 4 tasks in total.
+  test("failure in a single-stage job") {
+    sc = new SparkContext("local[1,1]", "test")
+    val results = sc.makeRDD(1 to 3, 3).map { x =>
+      FailureSuiteState.synchronized {
+        FailureSuiteState.tasksRun += 1
+        if (x == 1 && FailureSuiteState.tasksFailed == 0) {
+          FailureSuiteState.tasksFailed += 1
+          throw new Exception("Intentional task failure")
+        }
+      }
+      x * x
+    }.collect()
+    FailureSuiteState.synchronized {
+      assert(FailureSuiteState.tasksRun === 4)
+    }
+    assert(results.toList === List(1,4,9))
+    FailureSuiteState.clear()
+  }
+
+  // Run a map-reduce job in which a reduce task deterministically fails once.
+  test("failure in a two-stage job") {
+    sc = new SparkContext("local[1,1]", "test")
+    val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
+      case (k, v) =>
+        FailureSuiteState.synchronized {
+          FailureSuiteState.tasksRun += 1
+          if (k == 1 && FailureSuiteState.tasksFailed == 0) {
+            FailureSuiteState.tasksFailed += 1
+            throw new Exception("Intentional task failure")
+          }
+        }
+        (k, v(0) * v(0))
+      }.collect()
+    FailureSuiteState.synchronized {
+      assert(FailureSuiteState.tasksRun === 4)
+    }
+    assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
+    FailureSuiteState.clear()
+  }
+
+  test("failure because task results are not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
+
+    val thrown = intercept[SparkException] {
+      results.collect()
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getMessage.contains("NotSerializableException"))
+
+    FailureSuiteState.clear()
+  }
+
+  test("failure because task closure is not serializable") {
+    sc = new SparkContext("local[1,1]", "test")
+    val a = new NonSerializable
+
+    // Non-serializable closure in the final result stage
+    val thrown = intercept[SparkException] {
+      sc.parallelize(1 to 10, 2).map(x => a).count()
+    }
+    assert(thrown.getClass === classOf[SparkException])
+    assert(thrown.getMessage.contains("NotSerializableException"))
+
+    // Non-serializable closure in an earlier stage
+    val thrown1 = intercept[SparkException] {
+      sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
+    }
+    assert(thrown1.getClass === classOf[SparkException])
+    assert(thrown1.getMessage.contains("NotSerializableException"))
+
+    // Non-serializable closure in foreach function
+    val thrown2 = intercept[SparkException] {
+      sc.parallelize(1 to 10, 2).foreach(x => println(a))
+    }
+    assert(thrown2.getClass === classOf[SparkException])
+    assert(thrown2.getMessage.contains("NotSerializableException"))
+
+    FailureSuiteState.clear()
+  }
+
+  // TODO: Need to add tests with shuffle fetch failures.
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
new file mode 100644
index 0000000..35d1d41
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
+import SparkContext._
+
+class FileServerSuite extends FunSuite with LocalSparkContext {
+
+  @transient var tmpFile: File = _
+  @transient var testJarFile: File = _
+
+  override def beforeEach() {
+    super.beforeEach()
+    // Create a sample text file
+    val tmpdir = new File(Files.createTempDir(), "test")
+    tmpdir.mkdir()
+    tmpFile = new File(tmpdir, "FileServerSuite.txt")
+    val pw = new PrintWriter(tmpFile)
+    pw.println("100")
+    pw.close()
+  }
+
+  override def afterEach() {
+    super.afterEach()
+    // Clean up downloaded file
+    if (tmpFile.exists) {
+      tmpFile.delete()
+    }
+  }
+
+  test("Distributing files locally") {
+    sc = new SparkContext("local[4]", "test")
+    sc.addFile(tmpFile.toString)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+    val result = sc.parallelize(testData).reduceByKey {
+      val path = SparkFiles.get("FileServerSuite.txt")
+      val in = new BufferedReader(new FileReader(path))
+      val fileVal = in.readLine().toInt
+      in.close()
+      _ * fileVal + _ * fileVal
+    }.collect()
+    assert(result.toSet === Set((1,200), (2,300), (3,500)))
+  }
+
+  test("Distributing files locally using URL as input") {
+    // addFile("file:///....")
+    sc = new SparkContext("local[4]", "test")
+    sc.addFile(new File(tmpFile.toString).toURI.toString)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+    val result = sc.parallelize(testData).reduceByKey {
+      val path = SparkFiles.get("FileServerSuite.txt")
+      val in = new BufferedReader(new FileReader(path))
+      val fileVal = in.readLine().toInt
+      in.close()
+      _ * fileVal + _ * fileVal
+    }.collect()
+    assert(result.toSet === Set((1,200), (2,300), (3,500)))
+  }
+
+  test ("Dynamically adding JARS locally") {
+    sc = new SparkContext("local[4]", "test")
+    val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+    sc.addJar(sampleJarFile)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+    val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", classOf[Int])
+      val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      a + b
+    }.collect()
+    assert(result.toSet === Set((1,2), (2,7), (3,121)))
+  }
+
+  test("Distributing files on a standalone cluster") {
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    sc.addFile(tmpFile.toString)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+    val result = sc.parallelize(testData).reduceByKey {
+      val path = SparkFiles.get("FileServerSuite.txt")
+      val in = new BufferedReader(new FileReader(path))
+      val fileVal = in.readLine().toInt
+      in.close()
+      _ * fileVal + _ * fileVal
+    }.collect()
+    assert(result.toSet === Set((1,200), (2,300), (3,500)))
+  }
+
+  test ("Dynamically adding JARS on a standalone cluster") {
+    sc = new SparkContext("local-cluster[1,1,512]", "test")
+    val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+    sc.addJar(sampleJarFile)
+    val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+    val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+      val fac = Thread.currentThread.getContextClassLoader()
+                                    .loadClass("org.uncommons.maths.Maths")
+                                    .getDeclaredMethod("factorial", classOf[Int])
+      val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+      a + b
+    }.collect()
+    assert(result.toSet === Set((1,2), (2,7), (3,121)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
new file mode 100644
index 0000000..7b82a4c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{FileWriter, PrintWriter, File}
+
+import scala.io.Source
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.hadoop.io._
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodec, GzipCodec}
+
+
+import SparkContext._
+
+class FileSuite extends FunSuite with LocalSparkContext {
+
+  test("text files") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 4)
+    nums.saveAsTextFile(outputDir)
+    // Read the plain text file and check it's OK
+    val outputFile = new File(outputDir, "part-00000")
+    val content = Source.fromFile(outputFile).mkString
+    assert(content === "1\n2\n3\n4\n")
+    // Also try reading it in as a text file RDD
+    assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+  }
+
+  test("text files (compressed)") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+    val codec = new DefaultCodec()
+
+    val data = sc.parallelize("a" * 10000, 1)
+    data.saveAsTextFile(normalDir)
+    data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
+
+    val normalFile = new File(normalDir, "part-00000")
+    val normalContent = sc.textFile(normalDir).collect
+    assert(normalContent === Array.fill(10000)("a"))
+
+    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+    val compressedContent = sc.textFile(compressedOutputDir).collect
+    assert(compressedContent === Array.fill(10000)("a"))
+
+    assert(compressedFile.length < normalFile.length)
+  }
+
+  test("SequenceFiles") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+    nums.saveAsSequenceFile(outputDir)
+    // Try reading the output back as a SequenceFile
+    val output = sc.sequenceFile[IntWritable, Text](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("SequenceFile (compressed)") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+    val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+    val codec = new DefaultCodec()
+
+    val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
+    data.saveAsSequenceFile(normalDir)
+    data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
+
+    val normalFile = new File(normalDir, "part-00000")
+    val normalContent = sc.sequenceFile[String, String](normalDir).collect
+    assert(normalContent === Array.fill(100)("abc", "abc"))
+
+    val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+    val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
+    assert(compressedContent === Array.fill(100)("abc", "abc"))
+
+    assert(compressedFile.length < normalFile.length)
+  }
+
+  test("SequenceFile with writable key") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x)) 
+    nums.saveAsSequenceFile(outputDir)
+    // Try reading the output back as a SequenceFile
+    val output = sc.sequenceFile[IntWritable, Text](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("SequenceFile with writable value") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
+    nums.saveAsSequenceFile(outputDir)
+    // Try reading the output back as a SequenceFile
+    val output = sc.sequenceFile[IntWritable, Text](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("SequenceFile with writable key and value") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+    nums.saveAsSequenceFile(outputDir)
+    // Try reading the output back as a SequenceFile
+    val output = sc.sequenceFile[IntWritable, Text](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("implicit conversions in reading SequenceFiles") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+    nums.saveAsSequenceFile(outputDir)
+    // Similar to the tests above, we read a SequenceFile, but this time we pass type params
+    // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text]
+    val output1 = sc.sequenceFile[Int, String](outputDir)
+    assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+    // Also try having one type be a subclass of Writable and one not
+    val output2 = sc.sequenceFile[Int, Text](outputDir)
+    assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+    val output3 = sc.sequenceFile[IntWritable, String](outputDir)
+    assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("object files of ints") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 4)
+    nums.saveAsObjectFile(outputDir)
+    // Try reading the output back as an object file
+    val output = sc.objectFile[Int](outputDir)
+    assert(output.collect().toList === List(1, 2, 3, 4))
+  }
+
+  test("object files of complex types") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
+    nums.saveAsObjectFile(outputDir)
+    // Try reading the output back as an object file
+    val output = sc.objectFile[(Int, String)](outputDir)
+    assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+  }
+
+  test("write SequenceFile using new Hadoop API") {
+    import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+    nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]](
+        outputDir)
+    val output = sc.sequenceFile[IntWritable, Text](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("read SequenceFile using new Hadoop API") {
+    import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val outputDir = new File(tempDir, "output").getAbsolutePath
+    val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+    nums.saveAsSequenceFile(outputDir)
+    val output =
+        sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
+    assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+  }
+
+  test("file caching") {
+    sc = new SparkContext("local", "test")
+    val tempDir = Files.createTempDir()
+    val out = new FileWriter(tempDir + "/input")
+    out.write("Hello world!\n")
+    out.write("What's up?\n")
+    out.write("Goodbye\n")
+    out.close()
+    val rdd = sc.textFile(tempDir + "/input").cache()
+    assert(rdd.count() === 3)
+    assert(rdd.count() === 3)
+    assert(rdd.count() === 3)
+  }
+}