You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:50 UTC

[06/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
deleted file mode 100644
index b1e0b2b..0000000
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import scala.collection.mutable.ArrayBuffer
-import SparkContext._
-import spark.util.StatCounter
-import scala.math.abs
-
-class PartitioningSuite extends FunSuite with SharedSparkContext {
-
-  test("HashPartitioner equality") {
-    val p2 = new HashPartitioner(2)
-    val p4 = new HashPartitioner(4)
-    val anotherP4 = new HashPartitioner(4)
-    assert(p2 === p2)
-    assert(p4 === p4)
-    assert(p2 != p4)
-    assert(p4 != p2)
-    assert(p4 === anotherP4)
-    assert(anotherP4 === p4)
-  }
-
-  test("RangePartitioner equality") {
-    // Make an RDD where all the elements are the same so that the partition range bounds
-    // are deterministically all the same.
-    val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
-
-    val p2 = new RangePartitioner(2, rdd)
-    val p4 = new RangePartitioner(4, rdd)
-    val anotherP4 = new RangePartitioner(4, rdd)
-    val descendingP2 = new RangePartitioner(2, rdd, false)
-    val descendingP4 = new RangePartitioner(4, rdd, false)
-
-    assert(p2 === p2)
-    assert(p4 === p4)
-    assert(p2 != p4)
-    assert(p4 != p2)
-    assert(p4 === anotherP4)
-    assert(anotherP4 === p4)
-    assert(descendingP2 === descendingP2)
-    assert(descendingP4 === descendingP4)
-    assert(descendingP2 != descendingP4)
-    assert(descendingP4 != descendingP2)
-    assert(p2 != descendingP2)
-    assert(p4 != descendingP4)
-    assert(descendingP2 != p2)
-    assert(descendingP4 != p4)
-  }
-
-  test("HashPartitioner not equal to RangePartitioner") {
-    val rdd = sc.parallelize(1 to 10).map(x => (x, x))
-    val rangeP2 = new RangePartitioner(2, rdd)
-    val hashP2 = new HashPartitioner(2)
-    assert(rangeP2 === rangeP2)
-    assert(hashP2 === hashP2)
-    assert(hashP2 != rangeP2)
-    assert(rangeP2 != hashP2)
-  }
-
-  test("partitioner preservation") {
-    val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
-
-    val grouped2 = rdd.groupByKey(2)
-    val grouped4 = rdd.groupByKey(4)
-    val reduced2 = rdd.reduceByKey(_ + _, 2)
-    val reduced4 = rdd.reduceByKey(_ + _, 4)
-
-    assert(rdd.partitioner === None)
-
-    assert(grouped2.partitioner === Some(new HashPartitioner(2)))
-    assert(grouped4.partitioner === Some(new HashPartitioner(4)))
-    assert(reduced2.partitioner === Some(new HashPartitioner(2)))
-    assert(reduced4.partitioner === Some(new HashPartitioner(4)))
-
-    assert(grouped2.groupByKey().partitioner  === grouped2.partitioner)
-    assert(grouped2.groupByKey(3).partitioner !=  grouped2.partitioner)
-    assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner)
-    assert(grouped4.groupByKey().partitioner  === grouped4.partitioner)
-    assert(grouped4.groupByKey(3).partitioner !=  grouped4.partitioner)
-    assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
-
-    assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
-    assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
-    assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
-    assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
-
-    assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
-    assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
-    assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
-    assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
-
-    assert(grouped2.map(_ => 1).partitioner === None)
-    assert(grouped2.mapValues(_ => 1).partitioner === grouped2.partitioner)
-    assert(grouped2.flatMapValues(_ => Seq(1)).partitioner === grouped2.partitioner)
-    assert(grouped2.filter(_._1 > 4).partitioner === grouped2.partitioner)
-  }
-
-  test("partitioning Java arrays should fail") {
-    val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
-    val arrPairs: RDD[(Array[Int], Int)] =
-      sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
-
-    assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
-    // We can't catch all usages of arrays, since they might occur inside other collections:
-    //assert(fails { arrPairs.distinct() })
-    assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
-    assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
-  }
-
-  test("zero-length partitions should be correctly handled") {
-    // Create RDD with some consecutive empty partitions (including the "first" one)
-    val rdd: RDD[Double] = sc
-        .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
-        .filter(_ >= 0.0)
-
-    // Run the partitions, including the consecutive empty ones, through StatCounter
-    val stats: StatCounter = rdd.stats();
-    assert(abs(6.0 - stats.sum) < 0.01);
-    assert(abs(6.0/2 - rdd.mean) < 0.01);
-    assert(abs(1.0 - rdd.variance) < 0.01);
-    assert(abs(1.0 - rdd.stdev) < 0.01);
-
-    // Add other tests here for classes that should be able to handle empty partitions correctly
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
deleted file mode 100644
index 35c0471..0000000
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import SparkContext._
-
-class PipedRDDSuite extends FunSuite with SharedSparkContext {
-
-  test("basic pipe") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-
-    val piped = nums.pipe(Seq("cat"))
-
-    val c = piped.collect()
-    assert(c.size === 4)
-    assert(c(0) === "1")
-    assert(c(1) === "2")
-    assert(c(2) === "3")
-    assert(c(3) === "4")
-  }
-
-  test("advanced pipe") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    val bl = sc.broadcast(List("0"))
-
-    val piped = nums.pipe(Seq("cat"),
-      Map[String, String](),
-      (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
-      (i:Int, f: String=> Unit) => f(i + "_"))
-
-    val c = piped.collect()
-
-    assert(c.size === 8)
-    assert(c(0) === "0")
-    assert(c(1) === "\u0001")
-    assert(c(2) === "1_")
-    assert(c(3) === "2_")
-    assert(c(4) === "0")
-    assert(c(5) === "\u0001")
-    assert(c(6) === "3_")
-    assert(c(7) === "4_")
-
-    val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
-    val d = nums1.groupBy(str=>str.split("\t")(0)).
-      pipe(Seq("cat"),
-           Map[String, String](),
-           (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
-           (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
-    assert(d.size === 8)
-    assert(d(0) === "0")
-    assert(d(1) === "\u0001")
-    assert(d(2) === "b\t2_")
-    assert(d(3) === "b\t4_")
-    assert(d(4) === "0")
-    assert(d(5) === "\u0001")
-    assert(d(6) === "a\t1_")
-    assert(d(7) === "a\t3_")
-  }
-
-  test("pipe with env variable") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
-    val c = piped.collect()
-    assert(c.size === 2)
-    assert(c(0) === "LALALA")
-    assert(c(1) === "LALALA")
-  }
-
-  test("pipe with non-zero exit status") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
-    intercept[SparkException] {
-      piped.collect()
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
deleted file mode 100644
index e306952..0000000
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.HashMap
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
-import spark.rdd._
-import scala.collection.parallel.mutable
-
-class RDDSuite extends FunSuite with SharedSparkContext {
-
-  test("basic operations") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    assert(nums.collect().toList === List(1, 2, 3, 4))
-    val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
-    assert(dups.distinct().count() === 4)
-    assert(dups.distinct.count === 4)  // Can distinct and count be called without parentheses?
-    assert(dups.distinct.collect === dups.distinct().collect)
-    assert(dups.distinct(2).collect === dups.distinct().collect)
-    assert(nums.reduce(_ + _) === 10)
-    assert(nums.fold(0)(_ + _) === 10)
-    assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))
-    assert(nums.filter(_ > 2).collect().toList === List(3, 4))
-    assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
-    assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
-    assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
-    assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
-    assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
-    val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
-    assert(partitionSums.collect().toList === List(3, 7))
-
-    val partitionSumsWithSplit = nums.mapPartitionsWithSplit {
-      case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
-    }
-    assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
-
-    val partitionSumsWithIndex = nums.mapPartitionsWithIndex {
-      case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
-    }
-    assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7)))
-
-    intercept[UnsupportedOperationException] {
-      nums.filter(_ > 5).reduce(_ + _)
-    }
-  }
-
-  test("SparkContext.union") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
-    assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
-    assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4))
-    assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
-  }
-
-  test("aggregate") {
-    val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
-    type StringMap = HashMap[String, Int]
-    val emptyMap = new StringMap {
-      override def default(key: String): Int = 0
-    }
-    val mergeElement: (StringMap, (String, Int)) => StringMap = (map, pair) => {
-      map(pair._1) += pair._2
-      map
-    }
-    val mergeMaps: (StringMap, StringMap) => StringMap = (map1, map2) => {
-      for ((key, value) <- map2) {
-        map1(key) += value
-      }
-      map1
-    }
-    val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
-    assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
-  }
-
-  test("basic caching") {
-    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-    assert(rdd.collect().toList === List(1, 2, 3, 4))
-    assert(rdd.collect().toList === List(1, 2, 3, 4))
-    assert(rdd.collect().toList === List(1, 2, 3, 4))
-  }
-
-  test("caching with failures") {
-    val onlySplit = new Partition { override def index: Int = 0 }
-    var shouldFail = true
-    val rdd = new RDD[Int](sc, Nil) {
-      override def getPartitions: Array[Partition] = Array(onlySplit)
-      override val getDependencies = List[Dependency[_]]()
-      override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
-        if (shouldFail) {
-          throw new Exception("injected failure")
-        } else {
-          return Array(1, 2, 3, 4).iterator
-        }
-      }
-    }.cache()
-    val thrown = intercept[Exception]{
-      rdd.collect()
-    }
-    assert(thrown.getMessage.contains("injected failure"))
-    shouldFail = false
-    assert(rdd.collect().toList === List(1, 2, 3, 4))
-  }
-
-  test("empty RDD") {
-    val empty = new EmptyRDD[Int](sc)
-    assert(empty.count === 0)
-    assert(empty.collect().size === 0)
-
-    val thrown = intercept[UnsupportedOperationException]{
-      empty.reduce(_+_)
-    }
-    assert(thrown.getMessage.contains("empty"))
-
-    val emptyKv = new EmptyRDD[(Int, Int)](sc)
-    val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x))
-    assert(rdd.join(emptyKv).collect().size === 0)
-    assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
-    assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
-    assert(rdd.cogroup(emptyKv).collect().size === 2)
-    assert(rdd.union(emptyKv).collect().size === 2)
-  }
-
-  test("cogrouped RDDs") {
-    val data = sc.parallelize(1 to 10, 10)
-
-    val coalesced1 = data.coalesce(2)
-    assert(coalesced1.collect().toList === (1 to 10).toList)
-    assert(coalesced1.glom().collect().map(_.toList).toList ===
-      List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
-
-    // Check that the narrow dependency is also specified correctly
-    assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(0).toList ===
-      List(0, 1, 2, 3, 4))
-    assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
-      List(5, 6, 7, 8, 9))
-
-    val coalesced2 = data.coalesce(3)
-    assert(coalesced2.collect().toList === (1 to 10).toList)
-    assert(coalesced2.glom().collect().map(_.toList).toList ===
-      List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
-
-    val coalesced3 = data.coalesce(10)
-    assert(coalesced3.collect().toList === (1 to 10).toList)
-    assert(coalesced3.glom().collect().map(_.toList).toList ===
-      (1 to 10).map(x => List(x)).toList)
-
-    // If we try to coalesce into more partitions than the original RDD, it should just
-    // keep the original number of partitions.
-    val coalesced4 = data.coalesce(20)
-    assert(coalesced4.collect().toList === (1 to 10).toList)
-    assert(coalesced4.glom().collect().map(_.toList).toList ===
-      (1 to 10).map(x => List(x)).toList)
-
-    // we can optionally shuffle to keep the upstream parallel
-    val coalesced5 = data.coalesce(1, shuffle = true)
-    assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
-      null)
-  }
-  test("cogrouped RDDs with locality") {
-    val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
-    val coal3 = data3.coalesce(3)
-    val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
-    assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
-
-    // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
-    val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
-    val coalesced1 = data.coalesce(3)
-    assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
-
-    val splits = coalesced1.glom().collect().map(_.toList).toList
-    assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
-
-    assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
-
-    // If we try to coalesce into more partitions than the original RDD, it should just
-    // keep the original number of partitions.
-    val coalesced4 = data.coalesce(20)
-    val listOfLists = coalesced4.glom().collect().map(_.toList).toList
-    val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
-    assert( sortedList === (1 to 9).
-      map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
-  }
-
-  test("cogrouped RDDs with locality, large scale (10K partitions)") {
-    // large scale experiment
-    import collection.mutable
-    val rnd = scala.util.Random
-    val partitions = 10000
-    val numMachines = 50
-    val machines = mutable.ListBuffer[String]()
-    (1 to numMachines).foreach(machines += "m"+_)
-
-    val blocks = (1 to partitions).map(i =>
-    { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
-
-    val data2 = sc.makeRDD(blocks)
-    val coalesced2 = data2.coalesce(numMachines*2)
-
-    // test that you get over 90% locality in each group
-    val minLocality = coalesced2.partitions
-      .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
-      .foldLeft(1.)((perc, loc) => math.min(perc,loc))
-    assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
-
-    // test that the groups are load balanced with 100 +/- 20 elements in each
-    val maxImbalance = coalesced2.partitions
-      .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
-      .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
-    assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
-
-    val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
-    val coalesced3 = data3.coalesce(numMachines*2)
-    val minLocality2 = coalesced3.partitions
-      .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
-      .foldLeft(1.)((perc, loc) => math.min(perc,loc))
-    assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
-      (minLocality2*100.).toInt + "%")
-  }
-
-  test("zipped RDDs") {
-    val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    val zipped = nums.zip(nums.map(_ + 1.0))
-    assert(zipped.glom().map(_.toList).collect().toList ===
-      List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
-
-    intercept[IllegalArgumentException] {
-      nums.zip(sc.parallelize(1 to 4, 1)).collect()
-    }
-  }
-
-  test("partition pruning") {
-    val data = sc.parallelize(1 to 10, 10)
-    // Note that split number starts from 0, so > 8 means only 10th partition left.
-    val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
-    assert(prunedRdd.partitions.size === 1)
-    val prunedData = prunedRdd.collect()
-    assert(prunedData.size === 1)
-    assert(prunedData(0) === 10)
-  }
-
-  test("mapWith") {
-    import java.util.Random
-    val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
-    val randoms = ones.mapWith(
-      (index: Int) => new Random(index + 42))
-      {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
-    val prn42_3 = {
-      val prng42 = new Random(42)
-      prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
-    }
-    val prn43_3 = {
-      val prng43 = new Random(43)
-      prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
-    }
-    assert(randoms(2) === prn42_3)
-    assert(randoms(5) === prn43_3)
-  }
-
-  test("flatMapWith") {
-    import java.util.Random
-    val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
-    val randoms = ones.flatMapWith(
-      (index: Int) => new Random(index + 42))
-      {(t: Int, prng: Random) =>
-        val random = prng.nextDouble()
-        Seq(random * t, random * t * 10)}.
-      collect()
-    val prn42_3 = {
-      val prng42 = new Random(42)
-      prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
-    }
-    val prn43_3 = {
-      val prng43 = new Random(43)
-      prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
-    }
-    assert(randoms(5) === prn42_3 * 10)
-    assert(randoms(11) === prn43_3 * 10)
-  }
-
-  test("filterWith") {
-    import java.util.Random
-    val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
-    val sample = ints.filterWith(
-      (index: Int) => new Random(index + 42))
-      {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
-      collect()
-    val checkSample = {
-      val prng42 = new Random(42)
-      val prng43 = new Random(43)
-      Array(1, 2, 3, 4, 5, 6).filter{i =>
-	      if (i < 4) 0 == prng42.nextInt(3)
-	      else 0 == prng43.nextInt(3)}
-    }
-    assert(sample.size === checkSample.size)
-    for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
-  }
-
-  test("top with predefined ordering") {
-    val nums = Array.range(1, 100000)
-    val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
-    val topK = ints.top(5)
-    assert(topK.size === 5)
-    assert(topK === nums.reverse.take(5))
-  }
-
-  test("top with custom ordering") {
-    val words = Vector("a", "b", "c", "d")
-    implicit val ord = implicitly[Ordering[String]].reverse
-    val rdd = sc.makeRDD(words, 2)
-    val topK = rdd.top(2)
-    assert(topK.size === 2)
-    assert(topK.sorted === Array("b", "a"))
-  }
-
-  test("takeOrdered with predefined ordering") {
-    val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-    val rdd = sc.makeRDD(nums, 2)
-    val sortedLowerK = rdd.takeOrdered(5)
-    assert(sortedLowerK.size === 5)
-    assert(sortedLowerK === Array(1, 2, 3, 4, 5))
-  }
-
-  test("takeOrdered with custom ordering") {
-    val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
-    implicit val ord = implicitly[Ordering[Int]].reverse
-    val rdd = sc.makeRDD(nums, 2)
-    val sortedTopK = rdd.takeOrdered(5)
-    assert(sortedTopK.size === 5)
-    assert(sortedTopK === Array(10, 9, 8, 7, 6))
-    assert(sortedTopK === nums.sorted(ord).take(5))
-  }
-
-  test("takeSample") {
-    val data = sc.parallelize(1 to 100, 2)
-    for (seed <- 1 to 5) {
-      val sample = data.takeSample(withReplacement=false, 20, seed)
-      assert(sample.size === 20)        // Got exactly 20 elements
-      assert(sample.toSet.size === 20)  // Elements are distinct
-      assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
-    }
-    for (seed <- 1 to 5) {
-      val sample = data.takeSample(withReplacement=false, 200, seed)
-      assert(sample.size === 100)        // Got only 100 elements
-      assert(sample.toSet.size === 100)  // Elements are distinct
-      assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
-    }
-    for (seed <- 1 to 5) {
-      val sample = data.takeSample(withReplacement=true, 20, seed)
-      assert(sample.size === 20)        // Got exactly 20 elements
-      assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
-    }
-    for (seed <- 1 to 5) {
-      val sample = data.takeSample(withReplacement=true, 100, seed)
-      assert(sample.size === 100)        // Got exactly 100 elements
-      // Chance of getting all distinct elements is astronomically low, so test we got < 100
-      assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
-    }
-    for (seed <- 1 to 5) {
-      val sample = data.takeSample(withReplacement=true, 200, seed)
-      assert(sample.size === 200)        // Got exactly 200 elements
-      // Chance of getting all distinct elements is still quite low, so test we got < 100
-      assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
-    }
-  }
-
-  test("runJob on an invalid partition") {
-    intercept[IllegalArgumentException] {
-      sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SharedSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala
deleted file mode 100644
index 70c2451..0000000
--- a/core/src/test/scala/spark/SharedSparkContext.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.Suite
-import org.scalatest.BeforeAndAfterAll
-
-/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
-trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
-
-  @transient private var _sc: SparkContext = _
-
-  def sc: SparkContext = _sc
-
-  override def beforeAll() {
-    _sc = new SparkContext("local", "test")
-    super.beforeAll()
-  }
-
-  override def afterAll() {
-    if (_sc != null) {
-      LocalSparkContext.stop(_sc)
-      _sc = null
-    }
-    super.afterAll()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ShuffleNettySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ShuffleNettySuite.scala b/core/src/test/scala/spark/ShuffleNettySuite.scala
deleted file mode 100644
index 6bad6c1..0000000
--- a/core/src/test/scala/spark/ShuffleNettySuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.BeforeAndAfterAll
-
-
-class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
-
-  // This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
-
-  override def beforeAll(configMap: Map[String, Any]) {
-    System.setProperty("spark.shuffle.use.netty", "true")
-  }
-
-  override def afterAll(configMap: Map[String, Any]) {
-    System.setProperty("spark.shuffle.use.netty", "false")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
deleted file mode 100644
index 8745689..0000000
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-
-import spark.SparkContext._
-import spark.ShuffleSuite.NonJavaSerializableClass
-import spark.rdd.{SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
-import spark.util.MutablePair
-
-
-class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
-  test("groupByKey without compression") {
-    try {
-      System.setProperty("spark.shuffle.compress", "false")
-      sc = new SparkContext("local", "test")
-      val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
-      val groups = pairs.groupByKey(4).collect()
-      assert(groups.size === 2)
-      val valuesFor1 = groups.find(_._1 == 1).get._2
-      assert(valuesFor1.toList.sorted === List(1, 2, 3))
-      val valuesFor2 = groups.find(_._1 == 2).get._2
-      assert(valuesFor2.toList.sorted === List(1))
-    } finally {
-      System.setProperty("spark.shuffle.compress", "true")
-    }
-  }
-
-  test("shuffle non-zero block size") {
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    val NUM_BLOCKS = 3
-
-    val a = sc.parallelize(1 to 10, 2)
-    val b = a.map { x =>
-      (x, new NonJavaSerializableClass(x * 2))
-    }
-    // If the Kryo serializer is not used correctly, the shuffle would fail because the
-    // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
-      b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[spark.KryoSerializer].getName)
-    val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
-
-    assert(c.count === 10)
-
-    // All blocks must have non-zero size
-    (0 until NUM_BLOCKS).foreach { id =>
-      val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
-      assert(statuses.forall(s => s._2 > 0))
-    }
-  }
-
-  test("shuffle serializer") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    val a = sc.parallelize(1 to 10, 2)
-    val b = a.map { x =>
-      (x, new NonJavaSerializableClass(x * 2))
-    }
-    // If the Kryo serializer is not used correctly, the shuffle would fail because the
-    // default Java serializer cannot handle the non serializable class.
-    val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
-      b, new HashPartitioner(3)).setSerializer(classOf[spark.KryoSerializer].getName)
-    assert(c.count === 10)
-  }
-
-  test("zero sized blocks") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-
-    // 10 partitions from 4 keys
-    val NUM_BLOCKS = 10
-    val a = sc.parallelize(1 to 4, NUM_BLOCKS)
-    val b = a.map(x => (x, x*2))
-
-    // NOTE: The default Java serializer doesn't create zero-sized blocks.
-    //       So, use Kryo
-    val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
-      .setSerializer(classOf[spark.KryoSerializer].getName)
-
-    val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
-    assert(c.count === 4)
-
-    val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
-      val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
-      statuses.map(x => x._2)
-    }
-    val nonEmptyBlocks = blockSizes.filter(x => x > 0)
-
-    // We should have at most 4 non-zero sized partitions
-    assert(nonEmptyBlocks.size <= 4)
-  }
-
-  test("zero sized blocks without kryo") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-
-    // 10 partitions from 4 keys
-    val NUM_BLOCKS = 10
-    val a = sc.parallelize(1 to 4, NUM_BLOCKS)
-    val b = a.map(x => (x, x*2))
-
-    // NOTE: The default Java serializer should create zero-sized blocks
-    val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
-
-    val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
-    assert(c.count === 4)
-
-    val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
-      val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
-      statuses.map(x => x._2)
-    }
-    val nonEmptyBlocks = blockSizes.filter(x => x > 0)
-
-    // We should have at most 4 non-zero sized partitions
-    assert(nonEmptyBlocks.size <= 4)
-  }
-
-  test("shuffle using mutable pairs") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
-    val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
-    val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
-    val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
-      .collect()
-
-    data.foreach { pair => results should contain (pair) }
-  }
-
-  test("sorting using mutable pairs") {
-    // This is not in SortingSuite because of the local cluster setup.
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
-    val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
-    val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
-    val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
-      .sortByKey().collect()
-    results(0) should be (p(1, 11))
-    results(1) should be (p(2, 22))
-    results(2) should be (p(3, 33))
-    results(3) should be (p(100, 100))
-  }
-
-  test("cogroup using mutable pairs") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
-    val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
-    val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
-    val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
-    val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
-    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
-
-    assert(results(1)(0).length === 3)
-    assert(results(1)(0).contains(1))
-    assert(results(1)(0).contains(2))
-    assert(results(1)(0).contains(3))
-    assert(results(1)(1).length === 2)
-    assert(results(1)(1).contains("11"))
-    assert(results(1)(1).contains("12"))
-    assert(results(2)(0).length === 1)
-    assert(results(2)(0).contains(1))
-    assert(results(2)(1).length === 1)
-    assert(results(2)(1).contains("22"))
-    assert(results(3)(0).length === 0)
-    assert(results(3)(1).contains("3"))
-  }
-
-  test("subtract mutable pairs") {
-    // Use a local cluster with 2 processes to make sure there are both local and remote blocks
-    sc = new SparkContext("local-cluster[2,1,512]", "test")
-    def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
-    val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
-    val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
-    val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
-    val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
-    val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
-    results should have length (1)
-    // substracted rdd return results as Tuple2
-    results(0) should be ((3, 33))
-  }
-}
-
-object ShuffleSuite {
-
-  def mergeCombineException(x: Int, y: Int): Int = {
-    throw new SparkException("Exception for map-side combine.")
-    x + y
-  }
-
-  class NonJavaSerializableClass(val value: Int)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
deleted file mode 100644
index 1ef812d..0000000
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.PrivateMethodTester
-
-class DummyClass1 {}
-
-class DummyClass2 {
-  val x: Int = 0
-}
-
-class DummyClass3 {
-  val x: Int = 0
-  val y: Double = 0.0
-}
-
-class DummyClass4(val d: DummyClass3) {
-  val x: Int = 0
-}
-
-object DummyString {
-  def apply(str: String) : DummyString = new DummyString(str.toArray)
-}
-class DummyString(val arr: Array[Char]) {
-  override val hashCode: Int = 0
-  // JDK-7 has an extra hash32 field http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/rev/11987e85555f
-  @transient val hash32: Int = 0
-}
-
-class SizeEstimatorSuite
-  extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
-
-  var oldArch: String = _
-  var oldOops: String = _
-
-  override def beforeAll() {
-    // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
-    oldArch = System.setProperty("os.arch", "amd64")
-    oldOops = System.setProperty("spark.test.useCompressedOops", "true")
-  }
-
-  override def afterAll() {
-    resetOrClear("os.arch", oldArch)
-    resetOrClear("spark.test.useCompressedOops", oldOops)
-  }
-
-  test("simple classes") {
-    assert(SizeEstimator.estimate(new DummyClass1) === 16)
-    assert(SizeEstimator.estimate(new DummyClass2) === 16)
-    assert(SizeEstimator.estimate(new DummyClass3) === 24)
-    assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
-    assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
-  }
-
-  // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
-  // (Sun vs IBM). Use a DummyString class to make tests deterministic.
-  test("strings") {
-    assert(SizeEstimator.estimate(DummyString("")) === 40)
-    assert(SizeEstimator.estimate(DummyString("a")) === 48)
-    assert(SizeEstimator.estimate(DummyString("ab")) === 48)
-    assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-  }
-
-  test("primitive arrays") {
-    assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
-    assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
-    assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
-    assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
-    assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
-    assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
-    assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
-    assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
-    assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
-  }
-
-  test("object arrays") {
-    // Arrays containing nulls should just have one pointer per element
-    assert(SizeEstimator.estimate(new Array[String](10)) === 56)
-    assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
-
-    // For object arrays with non-null elements, each object should take one pointer plus
-    // however many bytes that class takes. (Note that Array.fill calls the code in its
-    // second parameter separately for each object, so we get distinct objects.)
-    assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
-    assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
-    assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
-    assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
-
-    // Past size 100, our samples 100 elements, but we should still get the right size.
-    assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
-
-    // If an array contains the *same* element many times, we should only count it once.
-    val d1 = new DummyClass1
-    assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
-    assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
-
-    // Same thing with huge array containing the same element many times. Note that this won't
-    // return exactly 4032 because it can't tell that *all* the elements will equal the first
-    // one it samples, but it should be close to that.
-
-    // TODO: If we sample 100 elements, this should always be 4176 ?
-    val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
-    assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000")
-    assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100")
-  }
-
-  test("32-bit arch") {
-    val arch = System.setProperty("os.arch", "x86")
-
-    val initialize = PrivateMethod[Unit]('initialize)
-    SizeEstimator invokePrivate initialize()
-
-    assert(SizeEstimator.estimate(DummyString("")) === 40)
-    assert(SizeEstimator.estimate(DummyString("a")) === 48)
-    assert(SizeEstimator.estimate(DummyString("ab")) === 48)
-    assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-
-    resetOrClear("os.arch", arch)
-  }
-
-  // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
-  // (Sun vs IBM). Use a DummyString class to make tests deterministic.
-  test("64-bit arch with no compressed oops") {
-    val arch = System.setProperty("os.arch", "amd64")
-    val oops = System.setProperty("spark.test.useCompressedOops", "false")
-
-    val initialize = PrivateMethod[Unit]('initialize)
-    SizeEstimator invokePrivate initialize()
-
-    assert(SizeEstimator.estimate(DummyString("")) === 56)
-    assert(SizeEstimator.estimate(DummyString("a")) === 64)
-    assert(SizeEstimator.estimate(DummyString("ab")) === 64)
-    assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
-
-    resetOrClear("os.arch", arch)
-    resetOrClear("spark.test.useCompressedOops", oops)
-  }
-
-  def resetOrClear(prop: String, oldValue: String) {
-    if (oldValue != null) {
-      System.setProperty(prop, oldValue)
-    } else {
-      System.clearProperty(prop)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SortingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
deleted file mode 100644
index b933c4a..0000000
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-import org.scalatest.matchers.ShouldMatchers
-import SparkContext._
-
-class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
-
-  test("sortByKey") {
-    val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
-    assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
-  }
-
-  test("large array") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 2)
-    val sorted = pairs.sortByKey()
-    assert(sorted.partitions.size === 2)
-    assert(sorted.collect() === pairArr.sortBy(_._1))
-  }
-
-  test("large array with one split") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 2)
-    val sorted = pairs.sortByKey(true, 1)
-    assert(sorted.partitions.size === 1)
-    assert(sorted.collect() === pairArr.sortBy(_._1))
-  }
-
-  test("large array with many partitions") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 2)
-    val sorted = pairs.sortByKey(true, 20)
-    assert(sorted.partitions.size === 20)
-    assert(sorted.collect() === pairArr.sortBy(_._1))
-  }
-
-  test("sort descending") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 2)
-    assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
-  }
-
-  test("sort descending with one split") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 1)
-    assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
-  }
-
-  test("sort descending with many partitions") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 2)
-    assert(pairs.sortByKey(false, 20).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
-  }
-
-  test("more partitions than elements") {
-    val rand = new scala.util.Random()
-    val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
-    val pairs = sc.parallelize(pairArr, 30)
-    assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
-  }
-
-  test("empty RDD") {
-    val pairArr = new Array[(Int, Int)](0)
-    val pairs = sc.parallelize(pairArr, 2)
-    assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
-  }
-
-  test("partition balancing") {
-    val pairArr = (1 to 1000).map(x => (x, x)).toArray
-    val sorted = sc.parallelize(pairArr, 4).sortByKey()
-    assert(sorted.collect() === pairArr.sortBy(_._1))
-    val partitions = sorted.collectPartitions()
-    logInfo("Partition lengths: " + partitions.map(_.length).mkString(", "))
-    partitions(0).length should be > 180
-    partitions(1).length should be > 180
-    partitions(2).length should be > 180
-    partitions(3).length should be > 180
-    partitions(0).last should be < partitions(1).head
-    partitions(1).last should be < partitions(2).head
-    partitions(2).last should be < partitions(3).head
-  }
-
-  test("partition balancing for descending sort") {
-    val pairArr = (1 to 1000).map(x => (x, x)).toArray
-    val sorted = sc.parallelize(pairArr, 4).sortByKey(false)
-    assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
-    val partitions = sorted.collectPartitions()
-    logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
-    partitions(0).length should be > 180
-    partitions(1).length should be > 180
-    partitions(2).length should be > 180
-    partitions(3).length should be > 180
-    partitions(0).last should be > partitions(1).head
-    partitions(1).last should be > partitions(2).head
-    partitions(2).last should be > partitions(3).head
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SparkContextInfoSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SparkContextInfoSuite.scala b/core/src/test/scala/spark/SparkContextInfoSuite.scala
deleted file mode 100644
index 6d50bf5..0000000
--- a/core/src/test/scala/spark/SparkContextInfoSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import spark.SparkContext._
-
-class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
-  test("getPersistentRDDs only returns RDDs that are marked as cached") {
-    sc = new SparkContext("local", "test")
-    assert(sc.getPersistentRDDs.isEmpty === true)
-
-    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    assert(sc.getPersistentRDDs.isEmpty === true)
-
-    rdd.cache()
-    assert(sc.getPersistentRDDs.size === 1)
-    assert(sc.getPersistentRDDs.values.head === rdd)
-  }
-
-  test("getPersistentRDDs returns an immutable map") {
-    sc = new SparkContext("local", "test")
-    val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
-    val myRdds = sc.getPersistentRDDs
-    assert(myRdds.size === 1)
-    assert(myRdds.values.head === rdd1)
-
-    val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
-
-    // getPersistentRDDs should have 2 RDDs, but myRdds should not change
-    assert(sc.getPersistentRDDs.size === 2)
-    assert(myRdds.size === 1)
-  }
-
-  test("getRDDStorageInfo only reports on RDDs that actually persist data") {
-    sc = new SparkContext("local", "test")
-    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
-    assert(sc.getRDDStorageInfo.size === 0)
-
-    rdd.collect()
-    assert(sc.getRDDStorageInfo.size === 1)
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ThreadingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
deleted file mode 100644
index f2acd0b..0000000
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.concurrent.Semaphore
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import SparkContext._
-
-/**
- * Holds state shared across task threads in some ThreadingSuite tests.
- */
-object ThreadingSuiteState {
-  val runningThreads = new AtomicInteger
-  val failed = new AtomicBoolean
-
-  def clear() {
-    runningThreads.set(0)
-    failed.set(false)
-  }
-}
-
-class ThreadingSuite extends FunSuite with LocalSparkContext {
-  
-  test("accessing SparkContext form a different thread") {
-    sc = new SparkContext("local", "test")
-    val nums = sc.parallelize(1 to 10, 2)
-    val sem = new Semaphore(0)
-    @volatile var answer1: Int = 0
-    @volatile var answer2: Int = 0
-    new Thread {
-      override def run() {
-        answer1 = nums.reduce(_ + _)
-        answer2 = nums.first()    // This will run "locally" in the current thread
-        sem.release()
-      }
-    }.start()
-    sem.acquire()
-    assert(answer1 === 55)
-    assert(answer2 === 1)
-  }
-
-  test("accessing SparkContext form multiple threads") {
-    sc = new SparkContext("local", "test")
-    val nums = sc.parallelize(1 to 10, 2)
-    val sem = new Semaphore(0)
-    @volatile var ok = true
-    for (i <- 0 until 10) {
-      new Thread {
-        override def run() {
-          val answer1 = nums.reduce(_ + _)
-          if (answer1 != 55) {
-            printf("In thread %d: answer1 was %d\n", i, answer1)
-            ok = false
-          }
-          val answer2 = nums.first()    // This will run "locally" in the current thread
-          if (answer2 != 1) {
-            printf("In thread %d: answer2 was %d\n", i, answer2)
-            ok = false
-          }
-          sem.release()
-        }
-      }.start()
-    }
-    sem.acquire(10)
-    if (!ok) {
-      fail("One or more threads got the wrong answer from an RDD operation")
-    }
-  }
-
-  test("accessing multi-threaded SparkContext form multiple threads") {
-    sc = new SparkContext("local[4]", "test")
-    val nums = sc.parallelize(1 to 10, 2)
-    val sem = new Semaphore(0)
-    @volatile var ok = true
-    for (i <- 0 until 10) {
-      new Thread {
-        override def run() {
-          val answer1 = nums.reduce(_ + _)
-          if (answer1 != 55) {
-            printf("In thread %d: answer1 was %d\n", i, answer1)
-            ok = false
-          }
-          val answer2 = nums.first()    // This will run "locally" in the current thread
-          if (answer2 != 1) {
-            printf("In thread %d: answer2 was %d\n", i, answer2)
-            ok = false
-          }
-          sem.release()
-        }
-      }.start()
-    }
-    sem.acquire(10)
-    if (!ok) {
-      fail("One or more threads got the wrong answer from an RDD operation")
-    }
-  }
-
-  test("parallel job execution") {
-    // This test launches two jobs with two threads each on a 4-core local cluster. Each thread
-    // waits until there are 4 threads running at once, to test that both jobs have been launched.
-    sc = new SparkContext("local[4]", "test")
-    val nums = sc.parallelize(1 to 2, 2)
-    val sem = new Semaphore(0)
-    ThreadingSuiteState.clear()
-    for (i <- 0 until 2) {
-      new Thread {
-        override def run() {
-          val ans = nums.map(number => {
-            val running = ThreadingSuiteState.runningThreads
-            running.getAndIncrement()
-            val time = System.currentTimeMillis()
-            while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
-              Thread.sleep(100)
-            }
-            if (running.get() != 4) {
-              println("Waited 1 second without seeing runningThreads = 4 (it was " +
-                running.get() + "); failing test")
-              ThreadingSuiteState.failed.set(true)
-            }
-            number
-          }).collect()
-          assert(ans.toList === List(1, 2))
-          sem.release()
-        }
-      }.start()
-    }
-    sem.acquire(2)
-    if (ThreadingSuiteState.failed.get()) {
-      fail("One or more threads didn't see runningThreads = 4")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/UnpersistSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala
deleted file mode 100644
index 93977d1..0000000
--- a/core/src/test/scala/spark/UnpersistSuite.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
-
-class UnpersistSuite extends FunSuite with LocalSparkContext {
-  test("unpersist RDD") {
-    sc = new SparkContext("local", "test")
-    val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-    rdd.count
-    assert(sc.persistentRdds.isEmpty === false)
-    rdd.unpersist()
-    assert(sc.persistentRdds.isEmpty === true)
-
-    failAfter(Span(3000, Millis)) {
-      try {
-        while (! sc.getRDDStorageInfo.isEmpty) {
-          Thread.sleep(200)
-        }
-      } catch {
-        case _ => { Thread.sleep(10) }
-          // Do nothing. We might see exceptions because block manager
-          // is racing this thread to remove entries from the driver.
-      }
-    }
-    assert(sc.getRDDStorageInfo.isEmpty === true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
deleted file mode 100644
index 98a6c1a..0000000
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
-import org.scalatest.FunSuite
-import org.apache.commons.io.FileUtils
-import scala.util.Random
-
-class UtilsSuite extends FunSuite {
-
-  test("bytesToString") {
-    assert(Utils.bytesToString(10) === "10.0 B")
-    assert(Utils.bytesToString(1500) === "1500.0 B")
-    assert(Utils.bytesToString(2000000) === "1953.1 KB")
-    assert(Utils.bytesToString(2097152) === "2.0 MB")
-    assert(Utils.bytesToString(2306867) === "2.2 MB")
-    assert(Utils.bytesToString(5368709120L) === "5.0 GB")
-    assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
-  }
-
-  test("copyStream") {
-    //input array initialization
-    val bytes = Array.ofDim[Byte](9000)
-    Random.nextBytes(bytes)
-
-    val os = new ByteArrayOutputStream()
-    Utils.copyStream(new ByteArrayInputStream(bytes), os)
-
-    assert(os.toByteArray.toList.equals(bytes.toList))
-  }
-
-  test("memoryStringToMb") {
-    assert(Utils.memoryStringToMb("1") === 0)
-    assert(Utils.memoryStringToMb("1048575") === 0)
-    assert(Utils.memoryStringToMb("3145728") === 3)
-
-    assert(Utils.memoryStringToMb("1024k") === 1)
-    assert(Utils.memoryStringToMb("5000k") === 4)
-    assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K"))
-
-    assert(Utils.memoryStringToMb("1024m") === 1024)
-    assert(Utils.memoryStringToMb("5000m") === 5000)
-    assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M"))
-
-    assert(Utils.memoryStringToMb("2g") === 2048)
-    assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G"))
-
-    assert(Utils.memoryStringToMb("2t") === 2097152)
-    assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T"))
-  }
-
-  test("splitCommandString") {
-    assert(Utils.splitCommandString("") === Seq())
-    assert(Utils.splitCommandString("a") === Seq("a"))
-    assert(Utils.splitCommandString("aaa") === Seq("aaa"))
-    assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c"))
-    assert(Utils.splitCommandString("  a   b\t c ") === Seq("a", "b", "c"))
-    assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c"))
-    assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d"))
-    assert(Utils.splitCommandString("'b c'") === Seq("b c"))
-    assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c"))
-    assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d"))
-    assert(Utils.splitCommandString("\"b c\"") === Seq("b c"))
-    assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e"))
-    assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d"))
-    assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c"))
-    assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c"))
-    assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c"))
-    assert(Utils.splitCommandString("'a'b") === Seq("ab"))
-    assert(Utils.splitCommandString("'a''b'") === Seq("ab"))
-    assert(Utils.splitCommandString("\"a\"b") === Seq("ab"))
-    assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab"))
-    assert(Utils.splitCommandString("''") === Seq(""))
-    assert(Utils.splitCommandString("\"\"") === Seq(""))
-  }
-
-  test("string formatting of time durations") {
-    val second = 1000
-    val minute = second * 60
-    val hour = minute * 60
-    def str = Utils.msDurationToString(_)
-
-    assert(str(123) === "123 ms")
-    assert(str(second) === "1.0 s")
-    assert(str(second + 462) === "1.5 s")
-    assert(str(hour) === "1.00 h")
-    assert(str(minute) === "1.0 m")
-    assert(str(minute + 4 * second + 34) === "1.1 m")
-    assert(str(10 * hour + minute + 4 * second) === "10.02 h")
-    assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
-  }
-
-  test("reading offset bytes of a file") {
-    val tmpDir2 = Files.createTempDir()
-    val f1Path = tmpDir2 + "/f1"
-    val f1 = new FileOutputStream(f1Path)
-    f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
-    f1.close()
-
-    // Read first few bytes
-    assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
-
-    // Read some middle bytes
-    assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
-
-    // Read last few bytes
-    assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
-
-    // Read some nonexistent bytes in the beginning
-    assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
-
-    // Read some nonexistent bytes at the end
-    assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
-
-    // Read some nonexistent bytes on both ends
-    assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
-
-    FileUtils.deleteDirectory(tmpDir2)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ZippedPartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
deleted file mode 100644
index bb5d379..0000000
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.immutable.NumericRange
-
-import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-import SparkContext._
-
-
-object ZippedPartitionsSuite {
-  def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = {
-    Iterator(i.toArray.size, s.toArray.size, d.toArray.size)
-  }
-}
-
-class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
-  test("print sizes") {
-    val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
-    val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
-    val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
-
-    val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
-
-    val obtainedSizes = zippedRDD.collect()
-    val expectedSizes = Array(2, 3, 1, 2, 3, 1)
-    assert(obtainedSizes.size == 6)
-    assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
deleted file mode 100644
index 1ba82fe..0000000
--- a/core/src/test/scala/spark/io/CompressionCodecSuite.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.io
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import org.scalatest.FunSuite
-
-
-class CompressionCodecSuite extends FunSuite {
-
-  def testCodec(codec: CompressionCodec) {
-    // Write 1000 integers to the output stream, compressed.
-    val outputStream = new ByteArrayOutputStream()
-    val out = codec.compressedOutputStream(outputStream)
-    for (i <- 1 until 1000) {
-      out.write(i % 256)
-    }
-    out.close()
-
-    // Read the 1000 integers back.
-    val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
-    val in = codec.compressedInputStream(inputStream)
-    for (i <- 1 until 1000) {
-      assert(in.read() === i % 256)
-    }
-    in.close()
-  }
-
-  test("default compression codec") {
-    val codec = CompressionCodec.createCodec()
-    assert(codec.getClass === classOf[SnappyCompressionCodec])
-    testCodec(codec)
-  }
-
-  test("lzf compression codec") {
-    val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
-    assert(codec.getClass === classOf[LZFCompressionCodec])
-    testCodec(codec)
-  }
-
-  test("snappy compression codec") {
-    val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
-    assert(codec.getClass === classOf[SnappyCompressionCodec])
-    testCodec(codec)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
deleted file mode 100644
index b0213b6..0000000
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
-  var filePath: String = _
-
-  before {
-    filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
-  }
-
-  test("MetricsConfig with default properties") {
-    val conf = new MetricsConfig(Option("dummy-file"))
-    conf.initialize()
-
-    assert(conf.properties.size() === 5)
-    assert(conf.properties.getProperty("test-for-dummy") === null)
-
-    val property = conf.getInstance("random")
-    assert(property.size() === 3)
-    assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
-    assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
-    assert(property.getProperty("sink.servlet.sample") === "false")
-  }
-
-  test("MetricsConfig with properties set") {
-    val conf = new MetricsConfig(Option(filePath))
-    conf.initialize()
-
-    val masterProp = conf.getInstance("master")
-    assert(masterProp.size() === 6)
-    assert(masterProp.getProperty("sink.console.period") === "20")
-    assert(masterProp.getProperty("sink.console.unit") === "minutes")
-    assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
-    assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
-    assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
-    assert(masterProp.getProperty("sink.servlet.sample") === "false")
-
-    val workerProp = conf.getInstance("worker")
-    assert(workerProp.size() === 6)
-    assert(workerProp.getProperty("sink.console.period") === "10")
-    assert(workerProp.getProperty("sink.console.unit") === "seconds")
-    assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
-    assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
-    assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
-    assert(workerProp.getProperty("sink.servlet.sample") === "false")
-  }
-
-  test("MetricsConfig with subProperties") {
-    val conf = new MetricsConfig(Option(filePath))
-    conf.initialize()
-
-    val propCategories = conf.propertyCategories
-    assert(propCategories.size === 3)
-
-    val masterProp = conf.getInstance("master")
-    val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
-    assert(sourceProps.size === 1)
-    assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
-
-    val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
-    assert(sinkProps.size === 2)
-    assert(sinkProps.contains("console"))
-    assert(sinkProps.contains("servlet"))
-
-    val consoleProps = sinkProps("console")
-    assert(consoleProps.size() === 2)
-
-    val servletProps = sinkProps("servlet")
-    assert(servletProps.size() === 3)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
deleted file mode 100644
index dc65ac6..0000000
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
-  var filePath: String = _
-
-  before {
-    filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
-    System.setProperty("spark.metrics.conf", filePath)
-  }
-
-  test("MetricsSystem with default config") {
-    val metricsSystem = MetricsSystem.createMetricsSystem("default")
-    val sources = metricsSystem.sources
-    val sinks = metricsSystem.sinks
-
-    assert(sources.length === 0)
-    assert(sinks.length === 0)
-    assert(!metricsSystem.getServletHandlers.isEmpty)
-  }
-
-  test("MetricsSystem with sources add") {
-    val metricsSystem = MetricsSystem.createMetricsSystem("test")
-    val sources = metricsSystem.sources
-    val sinks = metricsSystem.sinks
-
-    assert(sources.length === 0)
-    assert(sinks.length === 1)
-    assert(!metricsSystem.getServletHandlers.isEmpty)
-
-    val source = new spark.deploy.master.MasterSource(null)
-    metricsSystem.registerSource(source)
-    assert(sources.length === 1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
deleted file mode 100644
index dc8ca94..0000000
--- a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.{ BeforeAndAfter, FunSuite }
-import spark.SparkContext._
-import spark.rdd.JdbcRDD
-import java.sql._
-
-class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
-  before {
-    Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
-    val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
-    try {
-      val create = conn.createStatement
-      create.execute("""
-        CREATE TABLE FOO(
-          ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
-          DATA INTEGER
-        )""")
-      create.close
-      val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
-      (1 to 100).foreach { i =>
-        insert.setInt(1, i * 2)
-        insert.executeUpdate
-      }
-      insert.close
-    } catch {
-      case e: SQLException if e.getSQLState == "X0Y32" =>
-        // table exists
-    } finally {
-      conn.close
-    }
-  }
-
-  test("basic functionality") {
-    sc = new SparkContext("local", "test")
-    val rdd = new JdbcRDD(
-      sc,
-      () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
-      "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
-      1, 100, 3,
-      (r: ResultSet) => { r.getInt(1) } ).cache
-
-    assert(rdd.count === 100)
-    assert(rdd.reduce(_+_) === 10100)
-  }
-
-  after {
-    try {
-      DriverManager.getConnection("jdbc:derby:;shutdown=true")
-    } catch {
-      case se: SQLException if se.getSQLState == "XJ015" =>
-        // normal shutdown
-    }
-  }
-}