You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:56 UTC
[12/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
new file mode 100644
index 0000000..4434f3b
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import org.scalatest.matchers.ShouldMatchers
+import collection.mutable
+import java.util.Random
+import scala.math.exp
+import scala.math.signum
+import org.apache.spark.SparkContext._
+
+class AccumulatorSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
+
+ test ("basic accumulation"){
+ sc = new SparkContext("local", "test")
+ val acc : Accumulator[Int] = sc.accumulator(0)
+
+ val d = sc.parallelize(1 to 20)
+ d.foreach{x => acc += x}
+ acc.value should be (210)
+
+
+ val longAcc = sc.accumulator(0l)
+ val maxInt = Integer.MAX_VALUE.toLong
+ d.foreach{x => longAcc += maxInt + x}
+ longAcc.value should be (210l + maxInt * 20)
+ }
+
+ test ("value not assignable from tasks") {
+ sc = new SparkContext("local", "test")
+ val acc : Accumulator[Int] = sc.accumulator(0)
+
+ val d = sc.parallelize(1 to 20)
+ evaluating {d.foreach{x => acc.value = x}} should produce [Exception]
+ }
+
+ test ("add value to collection accumulators") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val d = sc.parallelize(1 to maxI)
+ d.foreach {
+ x => acc += x
+ }
+ val v = acc.value.asInstanceOf[mutable.Set[Int]]
+ for (i <- 1 to maxI) {
+ v should contain(i)
+ }
+ resetSparkContext()
+ }
+ }
+
+ implicit object SetAccum extends AccumulableParam[mutable.Set[Any], Any] {
+ def addInPlace(t1: mutable.Set[Any], t2: mutable.Set[Any]) : mutable.Set[Any] = {
+ t1 ++= t2
+ t1
+ }
+ def addAccumulator(t1: mutable.Set[Any], t2: Any) : mutable.Set[Any] = {
+ t1 += t2
+ t1
+ }
+ def zero(t: mutable.Set[Any]) : mutable.Set[Any] = {
+ new mutable.HashSet[Any]()
+ }
+ }
+
+ test ("value not readable in tasks") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val d = sc.parallelize(1 to maxI)
+ evaluating {
+ d.foreach {
+ x => acc.value += x
+ }
+ } should produce [SparkException]
+ resetSparkContext()
+ }
+ }
+
+ test ("collection accumulators") {
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) {
+ // test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val setAcc = sc.accumulableCollection(mutable.HashSet[Int]())
+ val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]())
+ val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]())
+ val d = sc.parallelize((1 to maxI) ++ (1 to maxI))
+ d.foreach {
+ x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)}
+ }
+
+ // Note that this is typed correctly -- no casts necessary
+ setAcc.value.size should be (maxI)
+ bufferAcc.value.size should be (2 * maxI)
+ mapAcc.value.size should be (maxI)
+ for (i <- 1 to maxI) {
+ setAcc.value should contain(i)
+ bufferAcc.value should contain(i)
+ mapAcc.value should contain (i -> i.toString)
+ }
+ resetSparkContext()
+ }
+ }
+
+ test ("localValue readable in tasks") {
+ import SetAccum._
+ val maxI = 1000
+ for (nThreads <- List(1, 10)) { //test single & multi-threaded
+ sc = new SparkContext("local[" + nThreads + "]", "test")
+ val acc: Accumulable[mutable.Set[Any], Any] = sc.accumulable(new mutable.HashSet[Any]())
+ val groupedInts = (1 to (maxI/20)).map {x => (20 * (x - 1) to 20 * x).toSet}
+ val d = sc.parallelize(groupedInts)
+ d.foreach {
+ x => acc.localValue ++= x
+ }
+ acc.value should be ( (0 to maxI).toSet)
+ resetSparkContext()
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
new file mode 100644
index 0000000..b3a53d9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/BroadcastSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+class BroadcastSuite extends FunSuite with LocalSparkContext {
+
+ test("basic broadcast") {
+ sc = new SparkContext("local", "test")
+ val list = List(1, 2, 3, 4)
+ val listBroadcast = sc.broadcast(list)
+ val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
+ assert(results.collect.toSet === Set((1, 10), (2, 10)))
+ }
+
+ test("broadcast variables accessed in multiple threads") {
+ sc = new SparkContext("local[10]", "test")
+ val list = List(1, 2, 3, 4)
+ val listBroadcast = sc.broadcast(list)
+ val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
+ assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
new file mode 100644
index 0000000..23b14f4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+import java.io.File
+import org.apache.spark.rdd._
+import org.apache.spark.SparkContext._
+import storage.StorageLevel
+
+class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
+ initLogging()
+
+ var checkpointDir: File = _
+ val partitioner = new HashPartitioner(2)
+
+ override def beforeEach() {
+ super.beforeEach()
+ checkpointDir = File.createTempFile("temp", "")
+ checkpointDir.delete()
+ sc = new SparkContext("local", "test")
+ sc.setCheckpointDir(checkpointDir.toString)
+ }
+
+ override def afterEach() {
+ super.afterEach()
+ if (checkpointDir != null) {
+ checkpointDir.delete()
+ }
+ }
+
+ test("basic checkpointing") {
+ val parCollection = sc.makeRDD(1 to 4)
+ val flatMappedRDD = parCollection.flatMap(x => 1 to x)
+ flatMappedRDD.checkpoint()
+ assert(flatMappedRDD.dependencies.head.rdd == parCollection)
+ val result = flatMappedRDD.collect()
+ assert(flatMappedRDD.dependencies.head.rdd != parCollection)
+ assert(flatMappedRDD.collect() === result)
+ }
+
+ test("RDDs with one-to-one dependencies") {
+ testCheckpointing(_.map(x => x.toString))
+ testCheckpointing(_.flatMap(x => 1 to x))
+ testCheckpointing(_.filter(_ % 2 == 0))
+ testCheckpointing(_.sample(false, 0.5, 0))
+ testCheckpointing(_.glom())
+ testCheckpointing(_.mapPartitions(_.map(_.toString)))
+ testCheckpointing(r => new MapPartitionsWithIndexRDD(r,
+ (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false ))
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+ testCheckpointing(_.pipe(Seq("cat")))
+ }
+
+ test("ParallelCollection") {
+ val parCollection = sc.makeRDD(1 to 4, 2)
+ val numPartitions = parCollection.partitions.size
+ parCollection.checkpoint()
+ assert(parCollection.dependencies === Nil)
+ val result = parCollection.collect()
+ assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
+ assert(parCollection.dependencies != Nil)
+ assert(parCollection.partitions.length === numPartitions)
+ assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
+ assert(parCollection.collect() === result)
+ }
+
+ test("BlockRDD") {
+ val blockId = "id"
+ val blockManager = SparkEnv.get.blockManager
+ blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
+ val blockRDD = new BlockRDD[String](sc, Array(blockId))
+ val numPartitions = blockRDD.partitions.size
+ blockRDD.checkpoint()
+ val result = blockRDD.collect()
+ assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result)
+ assert(blockRDD.dependencies != Nil)
+ assert(blockRDD.partitions.length === numPartitions)
+ assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList)
+ assert(blockRDD.collect() === result)
+ }
+
+ test("ShuffledRDD") {
+ testCheckpointing(rdd => {
+ // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
+ new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
+ })
+ }
+
+ test("UnionRDD") {
+ def otherRDD = sc.makeRDD(1 to 10, 1)
+
+ // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
+ // Current implementation of UnionRDD has transient reference to parent RDDs,
+ // so only the partitions will reduce in serialized size, not the RDD.
+ testCheckpointing(_.union(otherRDD), false, true)
+ testParentCheckpointing(_.union(otherRDD), false, true)
+ }
+
+ test("CartesianRDD") {
+ def otherRDD = sc.makeRDD(1 to 10, 1)
+ testCheckpointing(new CartesianRDD(sc, _, otherRDD))
+
+ // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+ // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+ // so only the RDD will reduce in serialized size, not the partitions.
+ testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+
+ // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
+ // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+ // Note that this test is very specific to the current implementation of CartesianRDD.
+ val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+ ones.checkpoint() // checkpoint that MappedRDD
+ val cartesian = new CartesianRDD(sc, ones, ones)
+ val splitBeforeCheckpoint =
+ serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
+ cartesian.count() // do the checkpointing
+ val splitAfterCheckpoint =
+ serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
+ assert(
+ (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
+ (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
+ "CartesianRDD.parents not updated after parent RDD checkpointed"
+ )
+ }
+
+ test("CoalescedRDD") {
+ testCheckpointing(_.coalesce(2))
+
+ // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+ // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+ // so only the RDD will reduce in serialized size, not the partitions.
+ testParentCheckpointing(_.coalesce(2), true, false)
+
+ // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
+ // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+ // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
+ val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+ ones.checkpoint() // checkpoint that MappedRDD
+ val coalesced = new CoalescedRDD(ones, 2)
+ val splitBeforeCheckpoint =
+ serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
+ coalesced.count() // do the checkpointing
+ val splitAfterCheckpoint =
+ serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
+ assert(
+ splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
+ "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
+ )
+ }
+
+ test("CoGroupedRDD") {
+ val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
+ testCheckpointing(rdd => {
+ CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
+ }, false, true)
+
+ val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
+ testParentCheckpointing(rdd => {
+ CheckpointSuite.cogroup(
+ longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
+ }, false, true)
+ }
+
+ test("ZippedRDD") {
+ testCheckpointing(
+ rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+
+ // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
+ // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
+ // so only the RDD will reduce in serialized size, not the partitions.
+ testParentCheckpointing(
+ rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+ }
+
+ test("CheckpointRDD with zero partitions") {
+ val rdd = new BlockRDD[Int](sc, Array[String]())
+ assert(rdd.partitions.size === 0)
+ assert(rdd.isCheckpointed === false)
+ rdd.checkpoint()
+ assert(rdd.count() === 0)
+ assert(rdd.isCheckpointed === true)
+ assert(rdd.partitions.size === 0)
+ }
+
+ /**
+ * Test checkpointing of the final RDD generated by the given operation. By default,
+ * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
+ * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
+ * not, but this is not done by default as usually the partitions do not refer to any RDD and
+ * therefore never store the lineage.
+ */
+ def testCheckpointing[U: ClassManifest](
+ op: (RDD[Int]) => RDD[U],
+ testRDDSize: Boolean = true,
+ testRDDPartitionSize: Boolean = false
+ ) {
+ // Generate the final RDD using given RDD operation
+ val baseRDD = generateLongLineageRDD()
+ val operatedRDD = op(baseRDD)
+ val parentRDD = operatedRDD.dependencies.headOption.orNull
+ val rddType = operatedRDD.getClass.getSimpleName
+ val numPartitions = operatedRDD.partitions.length
+
+ // Find serialized sizes before and after the checkpoint
+ val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+ operatedRDD.checkpoint()
+ val result = operatedRDD.collect()
+ val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+ // Test whether the checkpoint file has been created
+ assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+
+ // Test whether dependencies have been changed from its earlier parent RDD
+ assert(operatedRDD.dependencies.head.rdd != parentRDD)
+
+ // Test whether the partitions have been changed to the new Hadoop partitions
+ assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
+
+ // Test whether the number of partitions is same as before
+ assert(operatedRDD.partitions.length === numPartitions)
+
+ // Test whether the data in the checkpointed RDD is same as original
+ assert(operatedRDD.collect() === result)
+
+ // Test whether serialized size of the RDD has reduced. If the RDD
+ // does not have any dependency to another RDD (e.g., ParallelCollection,
+ // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
+ if (testRDDSize) {
+ logInfo("Size of " + rddType +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+ assert(
+ rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+ "Size of " + rddType + " did not reduce after checkpointing " +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+ )
+ }
+
+ // Test whether serialized size of the partitions has reduced. If the partitions
+ // do not have any non-transient reference to another RDD or another RDD's partitions, it
+ // does not refer to a lineage and therefore may not reduce in size after checkpointing.
+ // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
+ // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
+ // replaced with the HadooPartitions of the checkpointed RDD.
+ if (testRDDPartitionSize) {
+ logInfo("Size of " + rddType + " partitions "
+ + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
+ assert(
+ splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+ "Size of " + rddType + " partitions did not reduce after checkpointing " +
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+ )
+ }
+ }
+
+ /**
+ * Test whether checkpointing of the parent of the generated RDD also
+ * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
+ * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
+ * this RDD will remember the partitions and therefore potentially the whole lineage.
+ */
+ def testParentCheckpointing[U: ClassManifest](
+ op: (RDD[Int]) => RDD[U],
+ testRDDSize: Boolean,
+ testRDDPartitionSize: Boolean
+ ) {
+ // Generate the final RDD using given RDD operation
+ val baseRDD = generateLongLineageRDD()
+ val operatedRDD = op(baseRDD)
+ val parentRDD = operatedRDD.dependencies.head.rdd
+ val rddType = operatedRDD.getClass.getSimpleName
+ val parentRDDType = parentRDD.getClass.getSimpleName
+
+ // Get the partitions and dependencies of the parent in case they're lazily computed
+ parentRDD.dependencies
+ parentRDD.partitions
+
+ // Find serialized sizes before and after the checkpoint
+ val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+ parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one
+ val result = operatedRDD.collect()
+ val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+ // Test whether the data in the checkpointed RDD is same as original
+ assert(operatedRDD.collect() === result)
+
+ // Test whether serialized size of the RDD has reduced because of its parent being
+ // checkpointed. If this RDD or its parent RDD do not have any dependency
+ // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
+ // not reduce in size after checkpointing.
+ if (testRDDSize) {
+ assert(
+ rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+ "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+ )
+ }
+
+ // Test whether serialized size of the partitions has reduced because of its parent being
+ // checkpointed. If the partitions do not have any non-transient reference to another RDD
+ // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
+ // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
+ // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
+ // partitions must have changed after checkpointing.
+ if (testRDDPartitionSize) {
+ assert(
+ splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+ "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+ )
+ }
+
+ }
+
+ /**
+ * Generate an RDD with a long lineage of one-to-one dependencies.
+ */
+ def generateLongLineageRDD(): RDD[Int] = {
+ var rdd = sc.makeRDD(1 to 100, 4)
+ for (i <- 1 to 50) {
+ rdd = rdd.map(x => x + 1)
+ }
+ rdd
+ }
+
+ /**
+ * Generate an RDD with a long lineage specifically for CoGroupedRDD.
+ * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
+ * and narrow dependency with this RDD. This method generate such an RDD by a sequence
+ * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+ */
+ def generateLongLineageRDDForCoGroupedRDD() = {
+ val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
+
+ def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+
+ var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
+ for(i <- 1 to 10) {
+ cogrouped = cogrouped.mapValues(add).cogroup(ones)
+ }
+ cogrouped.mapValues(add)
+ }
+
+ /**
+ * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks
+ * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
+ */
+ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
+ (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
+ Utils.serialize(rdd.partitions).length)
+ }
+
+ /**
+ * Serialize and deserialize an object. This is useful to verify the objects
+ * contents after deserialization (e.g., the contents of an RDD split after
+ * it is sent to a slave along with a task)
+ */
+ def serializeDeserialize[T](obj: T): T = {
+ val bytes = Utils.serialize(obj)
+ Utils.deserialize[T](bytes)
+ }
+}
+
+
+object CheckpointSuite {
+ // This is a custom cogroup function that does not use mapValues like
+ // the PairRDDFunctions.cogroup()
+ def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
+ //println("First = " + first + ", second = " + second)
+ new CoGroupedRDD[K](
+ Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
+ part
+ ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
new file mode 100644
index 0000000..8494899
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/ClosureCleanerSuite.scala
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.NotSerializableException
+
+import org.scalatest.FunSuite
+import org.apache.spark.LocalSparkContext._
+import SparkContext._
+
+class ClosureCleanerSuite extends FunSuite {
+ test("closures inside an object") {
+ assert(TestObject.run() === 30) // 6 + 7 + 8 + 9
+ }
+
+ test("closures inside a class") {
+ val obj = new TestClass
+ assert(obj.run() === 30) // 6 + 7 + 8 + 9
+ }
+
+ test("closures inside a class with no default constructor") {
+ val obj = new TestClassWithoutDefaultConstructor(5)
+ assert(obj.run() === 30) // 6 + 7 + 8 + 9
+ }
+
+ test("closures that don't use fields of the outer class") {
+ val obj = new TestClassWithoutFieldAccess
+ assert(obj.run() === 30) // 6 + 7 + 8 + 9
+ }
+
+ test("nested closures inside an object") {
+ assert(TestObjectWithNesting.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
+ }
+
+ test("nested closures inside a class") {
+ val obj = new TestClassWithNesting(1)
+ assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
+ }
+}
+
+// A non-serializable class we create in closures to make sure that we aren't
+// keeping references to unneeded variables from our outer closures.
+class NonSerializable {}
+
+object TestObject {
+ def run(): Int = {
+ var nonSer = new NonSerializable
+ var x = 5
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + x).reduce(_ + _)
+ }
+ }
+}
+
+class TestClass extends Serializable {
+ var x = 5
+
+ def getX = x
+
+ def run(): Int = {
+ var nonSer = new NonSerializable
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + getX).reduce(_ + _)
+ }
+ }
+}
+
+class TestClassWithoutDefaultConstructor(x: Int) extends Serializable {
+ def getX = x
+
+ def run(): Int = {
+ var nonSer = new NonSerializable
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + getX).reduce(_ + _)
+ }
+ }
+}
+
+// This class is not serializable, but we aren't using any of its fields in our
+// closures, so they won't have a $outer pointing to it and should still work.
+class TestClassWithoutFieldAccess {
+ var nonSer = new NonSerializable
+
+ def run(): Int = {
+ var nonSer2 = new NonSerializable
+ var x = 5
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ nums.map(_ + x).reduce(_ + _)
+ }
+ }
+}
+
+
+object TestObjectWithNesting {
+ def run(): Int = {
+ var nonSer = new NonSerializable
+ var answer = 0
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ var y = 1
+ for (i <- 1 to 4) {
+ var nonSer2 = new NonSerializable
+ var x = i
+ answer += nums.map(_ + x + y).reduce(_ + _)
+ }
+ answer
+ }
+ }
+}
+
+class TestClassWithNesting(val y: Int) extends Serializable {
+ def getY = y
+
+ def run(): Int = {
+ var nonSer = new NonSerializable
+ var answer = 0
+ return withSpark(new SparkContext("local", "test")) { sc =>
+ val nums = sc.parallelize(Array(1, 2, 3, 4))
+ for (i <- 1 to 4) {
+ var nonSer2 = new NonSerializable
+ var x = i
+ answer += nums.map(_ + x + getY).reduce(_ + _)
+ }
+ answer
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/DistributedSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
new file mode 100644
index 0000000..7a856d4
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import network.ConnectionManagerId
+import org.scalatest.FunSuite
+import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.Timeouts._
+import org.scalatest.matchers.ShouldMatchers
+import org.scalatest.prop.Checkers
+import org.scalatest.time.{Span, Millis}
+import org.scalacheck.Arbitrary._
+import org.scalacheck.Gen
+import org.scalacheck.Prop._
+import org.eclipse.jetty.server.{Server, Request, Handler}
+
+import com.google.common.io.Files
+
+import scala.collection.mutable.ArrayBuffer
+
+import SparkContext._
+import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+import ui.JettyUtils
+
+
+class NotSerializableClass
+class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
+
+
+class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
+ with LocalSparkContext {
+
+ val clusterUrl = "local-cluster[2,1,512]"
+
+ after {
+ System.clearProperty("spark.reducer.maxMbInFlight")
+ System.clearProperty("spark.storage.memoryFraction")
+ }
+
+ test("task throws not serializable exception") {
+ // Ensures that executors do not crash when an exn is not serializable. If executors crash,
+ // this test will hang. Correct behavior is that executors don't crash but fail tasks
+ // and the scheduler throws a SparkException.
+
+ // numSlaves must be less than numPartitions
+ val numSlaves = 3
+ val numPartitions = 10
+
+ sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+ val data = sc.parallelize(1 to 100, numPartitions).
+ map(x => throw new NotSerializableExn(new NotSerializableClass))
+ intercept[SparkException] {
+ data.count()
+ }
+ resetSparkContext()
+ }
+
+ test("local-cluster format") {
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ resetSparkContext()
+ sc = new SparkContext("local-cluster[2 , 1 , 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ resetSparkContext()
+ sc = new SparkContext("local-cluster[2, 1, 512]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ resetSparkContext()
+ sc = new SparkContext("local-cluster[ 2, 1, 512 ]", "test")
+ assert(sc.parallelize(1 to 2, 2).count() == 2)
+ resetSparkContext()
+ }
+
+ test("simple groupByKey") {
+ sc = new SparkContext(clusterUrl, "test")
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5)
+ val groups = pairs.groupByKey(5).collect()
+ assert(groups.size === 2)
+ val valuesFor1 = groups.find(_._1 == 1).get._2
+ assert(valuesFor1.toList.sorted === List(1, 2, 3))
+ val valuesFor2 = groups.find(_._1 == 2).get._2
+ assert(valuesFor2.toList.sorted === List(1))
+ }
+
+ test("groupByKey where map output sizes exceed maxMbInFlight") {
+ System.setProperty("spark.reducer.maxMbInFlight", "1")
+ sc = new SparkContext(clusterUrl, "test")
+ // This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
+ // file should be about 2.5 MB
+ val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
+ val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
+ assert(groups.length === 16)
+ assert(groups.map(_._2).sum === 2000)
+ // Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
+ }
+
+ test("accumulators") {
+ sc = new SparkContext(clusterUrl, "test")
+ val accum = sc.accumulator(0)
+ sc.parallelize(1 to 10, 10).foreach(x => accum += x)
+ assert(accum.value === 55)
+ }
+
+ test("broadcast variables") {
+ sc = new SparkContext(clusterUrl, "test")
+ val array = new Array[Int](100)
+ val bv = sc.broadcast(array)
+ array(2) = 3 // Change the array -- this should not be seen on workers
+ val rdd = sc.parallelize(1 to 10, 10)
+ val sum = rdd.map(x => bv.value.sum).reduce(_ + _)
+ assert(sum === 0)
+ }
+
+ test("repeatedly failing task") {
+ sc = new SparkContext(clusterUrl, "test")
+ val accum = sc.accumulator(0)
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 10).foreach(x => println(x / 0))
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("more than 4 times"))
+ }
+
+ test("caching") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).cache()
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_ONLY_SER_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching on disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.DISK_ONLY_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_2)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ }
+
+ test("caching in memory and disk, serialized, replicated") {
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(1 to 1000, 10).persist(StorageLevel.MEMORY_AND_DISK_SER_2)
+
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+ assert(data.count() === 1000)
+
+ // Get all the locations of the first partition and try to fetch the partitions
+ // from those locations.
+ val blockIds = data.partitions.indices.map(index => "rdd_%d_%d".format(data.id, index)).toArray
+ val blockId = blockIds(0)
+ val blockManager = SparkEnv.get.blockManager
+ blockManager.master.getLocations(blockId).foreach(id => {
+ val bytes = BlockManagerWorker.syncGetBlock(
+ GetBlock(blockId), ConnectionManagerId(id.host, id.port))
+ val deserialized = blockManager.dataDeserialize(blockId, bytes).asInstanceOf[Iterator[Int]].toList
+ assert(deserialized === (1 to 100).toList)
+ })
+ }
+
+ test("compute without caching when no partitions fit in memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.0001")
+ sc = new SparkContext(clusterUrl, "test")
+ // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+ // to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
+ val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ System.clearProperty("spark.storage.memoryFraction")
+ }
+
+ test("compute when only some partitions fit in memory") {
+ System.setProperty("spark.storage.memoryFraction", "0.01")
+ sc = new SparkContext(clusterUrl, "test")
+ // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
+ // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
+ // to make sure that *some* of them do fit though
+ val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ assert(data.count() === 4000000)
+ System.clearProperty("spark.storage.memoryFraction")
+ }
+
+ test("passing environment variables to cluster") {
+ sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
+ val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
+ assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
+ }
+
+ test("recover from node failures") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ sc = new SparkContext(clusterUrl, "test")
+ val data = sc.parallelize(Seq(true, true), 2)
+ assert(data.count === 2) // force executors to start
+ assert(data.map(markNodeIfIdentity).collect.size === 2)
+ assert(data.map(failOnMarkedIdentity).collect.size === 2)
+ }
+
+ test("recover from repeated node failures during shuffle-map") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ sc = new SparkContext(clusterUrl, "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false), 2)
+ assert(data.count === 2)
+ assert(data.map(markNodeIfIdentity).collect.size === 2)
+ assert(data.map(failOnMarkedIdentity).map(x => x -> x).groupByKey.count === 2)
+ }
+ }
+
+ test("recover from repeated node failures during shuffle-reduce") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ sc = new SparkContext(clusterUrl, "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, true), 2)
+ assert(data.count === 2)
+ assert(data.map(markNodeIfIdentity).collect.size === 2)
+ // This relies on mergeCombiners being used to perform the actual reduce for this
+ // test to actually be testing what it claims.
+ val grouped = data.map(x => x -> x).combineByKey(
+ x => x,
+ (x: Boolean, y: Boolean) => x,
+ (x: Boolean, y: Boolean) => failOnMarkedIdentity(x)
+ )
+ assert(grouped.collect.size === 1)
+ }
+ }
+
+ test("recover from node failures with replication") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ // Using more than two nodes so we don't have a symmetric communication pattern and might
+ // cache a partially correct list of peers.
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+
+ assert(data.count === 4)
+ assert(data.map(markNodeIfIdentity).collect.size === 4)
+ assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+ // Create a new replicated RDD to make sure that cached peer information doesn't cause
+ // problems.
+ val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data2.count === 2)
+ }
+ }
+
+ test("unpersist RDDs") {
+ DistributedSuite.amMaster = true
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+ data.count
+ assert(sc.persistentRdds.isEmpty === false)
+ data.unpersist()
+ assert(sc.persistentRdds.isEmpty === true)
+
+ failAfter(Span(3000, Millis)) {
+ try {
+ while (! sc.getRDDStorageInfo.isEmpty) {
+ Thread.sleep(200)
+ }
+ } catch {
+ case _ => { Thread.sleep(10) }
+ // Do nothing. We might see exceptions because block manager
+ // is racing this thread to remove entries from the driver.
+ }
+ }
+ }
+
+ test("job should fail if TaskResult exceeds Akka frame size") {
+ // We must use local-cluster mode since results are returned differently
+ // when running under LocalScheduler:
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val akkaFrameSize =
+ sc.env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size").toInt
+ val rdd = sc.parallelize(Seq(1)).map{x => new Array[Byte](akkaFrameSize)}
+ val exception = intercept[SparkException] {
+ rdd.reduce((x, y) => x)
+ }
+ exception.getMessage should endWith("result exceeded Akka frame size")
+ }
+}
+
+object DistributedSuite {
+ // Indicates whether this JVM is marked for failure.
+ var mark = false
+
+ // Set by test to remember if we are in the driver program so we can assert
+ // that we are not.
+ var amMaster = false
+
+ // Act like an identity function, but if the argument is true, set mark to true.
+ def markNodeIfIdentity(item: Boolean): Boolean = {
+ if (item) {
+ assert(!amMaster)
+ mark = true
+ }
+ item
+ }
+
+ // Act like an identity function, but if mark was set to true previously, fail,
+ // crashing the entire JVM.
+ def failOnMarkedIdentity(item: Boolean): Boolean = {
+ if (mark) {
+ System.exit(42)
+ }
+ item
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/DriverSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/DriverSuite.scala b/core/src/test/scala/org/apache/spark/DriverSuite.scala
new file mode 100644
index 0000000..b08aad1
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/DriverSuite.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.File
+
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Timeouts
+import org.scalatest.prop.TableDrivenPropertyChecks._
+import org.scalatest.time.SpanSugar._
+
+class DriverSuite extends FunSuite with Timeouts {
+ test("driver should exit after finishing") {
+ assert(System.getenv("SPARK_HOME") != null)
+ // Regression test for SPARK-530: "Spark driver process doesn't exit after finishing"
+ val masters = Table(("master"), ("local"), ("local-cluster[2,1,512]"))
+ forAll(masters) { (master: String) =>
+ failAfter(30 seconds) {
+ Utils.execute(Seq("./spark-class", "org.apache.spark.DriverWithoutCleanup", master),
+ new File(System.getenv("SPARK_HOME")))
+ }
+ }
+ }
+}
+
+/**
+ * Program that creates a Spark driver but doesn't call SparkContext.stop() or
+ * Sys.exit() after finishing.
+ */
+object DriverWithoutCleanup {
+ def main(args: Array[String]) {
+ Logger.getRootLogger().setLevel(Level.WARN)
+ val sc = new SparkContext(args(0), "DriverWithoutCleanup")
+ sc.parallelize(1 to 100, 4).count()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FailureSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
new file mode 100644
index 0000000..ee89a7a
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.scalatest.FunSuite
+
+import SparkContext._
+
+// Common state shared by FailureSuite-launched tasks. We use a global object
+// for this because any local variables used in the task closures will rightfully
+// be copied for each task, so there's no other way for them to share state.
+object FailureSuiteState {
+ var tasksRun = 0
+ var tasksFailed = 0
+
+ def clear() {
+ synchronized {
+ tasksRun = 0
+ tasksFailed = 0
+ }
+ }
+}
+
+class FailureSuite extends FunSuite with LocalSparkContext {
+
+ // Run a 3-task map job in which task 1 deterministically fails once, and check
+ // whether the job completes successfully and we ran 4 tasks in total.
+ test("failure in a single-stage job") {
+ sc = new SparkContext("local[1,1]", "test")
+ val results = sc.makeRDD(1 to 3, 3).map { x =>
+ FailureSuiteState.synchronized {
+ FailureSuiteState.tasksRun += 1
+ if (x == 1 && FailureSuiteState.tasksFailed == 0) {
+ FailureSuiteState.tasksFailed += 1
+ throw new Exception("Intentional task failure")
+ }
+ }
+ x * x
+ }.collect()
+ FailureSuiteState.synchronized {
+ assert(FailureSuiteState.tasksRun === 4)
+ }
+ assert(results.toList === List(1,4,9))
+ FailureSuiteState.clear()
+ }
+
+ // Run a map-reduce job in which a reduce task deterministically fails once.
+ test("failure in a two-stage job") {
+ sc = new SparkContext("local[1,1]", "test")
+ val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
+ case (k, v) =>
+ FailureSuiteState.synchronized {
+ FailureSuiteState.tasksRun += 1
+ if (k == 1 && FailureSuiteState.tasksFailed == 0) {
+ FailureSuiteState.tasksFailed += 1
+ throw new Exception("Intentional task failure")
+ }
+ }
+ (k, v(0) * v(0))
+ }.collect()
+ FailureSuiteState.synchronized {
+ assert(FailureSuiteState.tasksRun === 4)
+ }
+ assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
+ FailureSuiteState.clear()
+ }
+
+ test("failure because task results are not serializable") {
+ sc = new SparkContext("local[1,1]", "test")
+ val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
+
+ val thrown = intercept[SparkException] {
+ results.collect()
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ FailureSuiteState.clear()
+ }
+
+ test("failure because task closure is not serializable") {
+ sc = new SparkContext("local[1,1]", "test")
+ val a = new NonSerializable
+
+ // Non-serializable closure in the final result stage
+ val thrown = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => a).count()
+ }
+ assert(thrown.getClass === classOf[SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ // Non-serializable closure in an earlier stage
+ val thrown1 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).map(x => (x, a)).partitionBy(new HashPartitioner(3)).count()
+ }
+ assert(thrown1.getClass === classOf[SparkException])
+ assert(thrown1.getMessage.contains("NotSerializableException"))
+
+ // Non-serializable closure in foreach function
+ val thrown2 = intercept[SparkException] {
+ sc.parallelize(1 to 10, 2).foreach(x => println(a))
+ }
+ assert(thrown2.getClass === classOf[SparkException])
+ assert(thrown2.getMessage.contains("NotSerializableException"))
+
+ FailureSuiteState.clear()
+ }
+
+ // TODO: Need to add tests with shuffle fetch failures.
+}
+
+
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FileServerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
new file mode 100644
index 0000000..35d1d41
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import java.io.{File, PrintWriter, FileReader, BufferedReader}
+import SparkContext._
+
+class FileServerSuite extends FunSuite with LocalSparkContext {
+
+ @transient var tmpFile: File = _
+ @transient var testJarFile: File = _
+
+ override def beforeEach() {
+ super.beforeEach()
+ // Create a sample text file
+ val tmpdir = new File(Files.createTempDir(), "test")
+ tmpdir.mkdir()
+ tmpFile = new File(tmpdir, "FileServerSuite.txt")
+ val pw = new PrintWriter(tmpFile)
+ pw.println("100")
+ pw.close()
+ }
+
+ override def afterEach() {
+ super.afterEach()
+ // Clean up downloaded file
+ if (tmpFile.exists) {
+ tmpFile.delete()
+ }
+ }
+
+ test("Distributing files locally") {
+ sc = new SparkContext("local[4]", "test")
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val path = SparkFiles.get("FileServerSuite.txt")
+ val in = new BufferedReader(new FileReader(path))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect()
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test("Distributing files locally using URL as input") {
+ // addFile("file:///....")
+ sc = new SparkContext("local[4]", "test")
+ sc.addFile(new File(tmpFile.toString).toURI.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val path = SparkFiles.get("FileServerSuite.txt")
+ val in = new BufferedReader(new FileReader(path))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect()
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test ("Dynamically adding JARS locally") {
+ sc = new SparkContext("local[4]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
+
+ test("Distributing files on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ sc.addFile(tmpFile.toString)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
+ val result = sc.parallelize(testData).reduceByKey {
+ val path = SparkFiles.get("FileServerSuite.txt")
+ val in = new BufferedReader(new FileReader(path))
+ val fileVal = in.readLine().toInt
+ in.close()
+ _ * fileVal + _ * fileVal
+ }.collect()
+ assert(result.toSet === Set((1,200), (2,300), (3,500)))
+ }
+
+ test ("Dynamically adding JARS on a standalone cluster") {
+ sc = new SparkContext("local-cluster[1,1,512]", "test")
+ val sampleJarFile = getClass.getClassLoader.getResource("uncommons-maths-1.2.2.jar").getFile()
+ sc.addJar(sampleJarFile)
+ val testData = Array((1,1), (1,1), (2,1), (3,5), (2,3), (3,0))
+ val result = sc.parallelize(testData).reduceByKey { (x,y) =>
+ val fac = Thread.currentThread.getContextClassLoader()
+ .loadClass("org.uncommons.maths.Maths")
+ .getDeclaredMethod("factorial", classOf[Int])
+ val a = fac.invoke(null, x.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ val b = fac.invoke(null, y.asInstanceOf[java.lang.Integer]).asInstanceOf[Long].toInt
+ a + b
+ }.collect()
+ assert(result.toSet === Set((1,2), (2,7), (3,121)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/org/apache/spark/FileSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala
new file mode 100644
index 0000000..7b82a4c
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/FileSuite.scala
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import java.io.{FileWriter, PrintWriter, File}
+
+import scala.io.Source
+
+import com.google.common.io.Files
+import org.scalatest.FunSuite
+import org.apache.hadoop.io._
+import org.apache.hadoop.io.compress.{DefaultCodec, CompressionCodec, GzipCodec}
+
+
+import SparkContext._
+
+class FileSuite extends FunSuite with LocalSparkContext {
+
+ test("text files") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 4)
+ nums.saveAsTextFile(outputDir)
+ // Read the plain text file and check it's OK
+ val outputFile = new File(outputDir, "part-00000")
+ val content = Source.fromFile(outputFile).mkString
+ assert(content === "1\n2\n3\n4\n")
+ // Also try reading it in as a text file RDD
+ assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
+ }
+
+ test("text files (compressed)") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+ val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+ val codec = new DefaultCodec()
+
+ val data = sc.parallelize("a" * 10000, 1)
+ data.saveAsTextFile(normalDir)
+ data.saveAsTextFile(compressedOutputDir, classOf[DefaultCodec])
+
+ val normalFile = new File(normalDir, "part-00000")
+ val normalContent = sc.textFile(normalDir).collect
+ assert(normalContent === Array.fill(10000)("a"))
+
+ val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+ val compressedContent = sc.textFile(compressedOutputDir).collect
+ assert(compressedContent === Array.fill(10000)("a"))
+
+ assert(compressedFile.length < normalFile.length)
+ }
+
+ test("SequenceFiles") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("SequenceFile (compressed)") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val normalDir = new File(tempDir, "output_normal").getAbsolutePath
+ val compressedOutputDir = new File(tempDir, "output_compressed").getAbsolutePath
+ val codec = new DefaultCodec()
+
+ val data = sc.parallelize(Seq.fill(100)("abc"), 1).map(x => (x, x))
+ data.saveAsSequenceFile(normalDir)
+ data.saveAsSequenceFile(compressedOutputDir, Some(classOf[DefaultCodec]))
+
+ val normalFile = new File(normalDir, "part-00000")
+ val normalContent = sc.sequenceFile[String, String](normalDir).collect
+ assert(normalContent === Array.fill(100)("abc", "abc"))
+
+ val compressedFile = new File(compressedOutputDir, "part-00000" + codec.getDefaultExtension)
+ val compressedContent = sc.sequenceFile[String, String](compressedOutputDir).collect
+ assert(compressedContent === Array.fill(100)("abc", "abc"))
+
+ assert(compressedFile.length < normalFile.length)
+ }
+
+ test("SequenceFile with writable key") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("SequenceFile with writable value") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("SequenceFile with writable key and value") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+ nums.saveAsSequenceFile(outputDir)
+ // Try reading the output back as a SequenceFile
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("implicit conversions in reading SequenceFiles") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
+ nums.saveAsSequenceFile(outputDir)
+ // Similar to the tests above, we read a SequenceFile, but this time we pass type params
+ // that are convertable to Writable instead of calling sequenceFile[IntWritable, Text]
+ val output1 = sc.sequenceFile[Int, String](outputDir)
+ assert(output1.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+ // Also try having one type be a subclass of Writable and one not
+ val output2 = sc.sequenceFile[Int, Text](outputDir)
+ assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ val output3 = sc.sequenceFile[IntWritable, String](outputDir)
+ assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("object files of ints") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 4)
+ nums.saveAsObjectFile(outputDir)
+ // Try reading the output back as an object file
+ val output = sc.objectFile[Int](outputDir)
+ assert(output.collect().toList === List(1, 2, 3, 4))
+ }
+
+ test("object files of complex types") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
+ nums.saveAsObjectFile(outputDir)
+ // Try reading the output back as an object file
+ val output = sc.objectFile[(Int, String)](outputDir)
+ assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
+ }
+
+ test("write SequenceFile using new Hadoop API") {
+ import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+ nums.saveAsNewAPIHadoopFile[SequenceFileOutputFormat[IntWritable, Text]](
+ outputDir)
+ val output = sc.sequenceFile[IntWritable, Text](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("read SequenceFile using new Hadoop API") {
+ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val outputDir = new File(tempDir, "output").getAbsolutePath
+ val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
+ nums.saveAsSequenceFile(outputDir)
+ val output =
+ sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
+ assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
+ }
+
+ test("file caching") {
+ sc = new SparkContext("local", "test")
+ val tempDir = Files.createTempDir()
+ val out = new FileWriter(tempDir + "/input")
+ out.write("Hello world!\n")
+ out.write("What's up?\n")
+ out.write("Goodbye\n")
+ out.close()
+ val rdd = sc.textFile(tempDir + "/input").cache()
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ assert(rdd.count() === 3)
+ }
+}