You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:58:50 UTC
[06/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/PartitioningSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
deleted file mode 100644
index b1e0b2b..0000000
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import scala.collection.mutable.ArrayBuffer
-import SparkContext._
-import spark.util.StatCounter
-import scala.math.abs
-
-class PartitioningSuite extends FunSuite with SharedSparkContext {
-
- test("HashPartitioner equality") {
- val p2 = new HashPartitioner(2)
- val p4 = new HashPartitioner(4)
- val anotherP4 = new HashPartitioner(4)
- assert(p2 === p2)
- assert(p4 === p4)
- assert(p2 != p4)
- assert(p4 != p2)
- assert(p4 === anotherP4)
- assert(anotherP4 === p4)
- }
-
- test("RangePartitioner equality") {
- // Make an RDD where all the elements are the same so that the partition range bounds
- // are deterministically all the same.
- val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
-
- val p2 = new RangePartitioner(2, rdd)
- val p4 = new RangePartitioner(4, rdd)
- val anotherP4 = new RangePartitioner(4, rdd)
- val descendingP2 = new RangePartitioner(2, rdd, false)
- val descendingP4 = new RangePartitioner(4, rdd, false)
-
- assert(p2 === p2)
- assert(p4 === p4)
- assert(p2 != p4)
- assert(p4 != p2)
- assert(p4 === anotherP4)
- assert(anotherP4 === p4)
- assert(descendingP2 === descendingP2)
- assert(descendingP4 === descendingP4)
- assert(descendingP2 != descendingP4)
- assert(descendingP4 != descendingP2)
- assert(p2 != descendingP2)
- assert(p4 != descendingP4)
- assert(descendingP2 != p2)
- assert(descendingP4 != p4)
- }
-
- test("HashPartitioner not equal to RangePartitioner") {
- val rdd = sc.parallelize(1 to 10).map(x => (x, x))
- val rangeP2 = new RangePartitioner(2, rdd)
- val hashP2 = new HashPartitioner(2)
- assert(rangeP2 === rangeP2)
- assert(hashP2 === hashP2)
- assert(hashP2 != rangeP2)
- assert(rangeP2 != hashP2)
- }
-
- test("partitioner preservation") {
- val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
-
- val grouped2 = rdd.groupByKey(2)
- val grouped4 = rdd.groupByKey(4)
- val reduced2 = rdd.reduceByKey(_ + _, 2)
- val reduced4 = rdd.reduceByKey(_ + _, 4)
-
- assert(rdd.partitioner === None)
-
- assert(grouped2.partitioner === Some(new HashPartitioner(2)))
- assert(grouped4.partitioner === Some(new HashPartitioner(4)))
- assert(reduced2.partitioner === Some(new HashPartitioner(2)))
- assert(reduced4.partitioner === Some(new HashPartitioner(4)))
-
- assert(grouped2.groupByKey().partitioner === grouped2.partitioner)
- assert(grouped2.groupByKey(3).partitioner != grouped2.partitioner)
- assert(grouped2.groupByKey(2).partitioner === grouped2.partitioner)
- assert(grouped4.groupByKey().partitioner === grouped4.partitioner)
- assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
- assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
-
- assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
- assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
- assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
- assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
-
- assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
- assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
- assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
- assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
-
- assert(grouped2.map(_ => 1).partitioner === None)
- assert(grouped2.mapValues(_ => 1).partitioner === grouped2.partitioner)
- assert(grouped2.flatMapValues(_ => Seq(1)).partitioner === grouped2.partitioner)
- assert(grouped2.filter(_._1 > 4).partitioner === grouped2.partitioner)
- }
-
- test("partitioning Java arrays should fail") {
- val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
- val arrPairs: RDD[(Array[Int], Int)] =
- sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
-
- assert(intercept[SparkException]{ arrs.distinct() }.getMessage.contains("array"))
- // We can't catch all usages of arrays, since they might occur inside other collections:
- //assert(fails { arrPairs.distinct() })
- assert(intercept[SparkException]{ arrPairs.partitionBy(new HashPartitioner(2)) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.join(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.leftOuterJoin(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.rightOuterJoin(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.groupByKey() }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.countByKey() }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.countByKeyApprox(1) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.cogroup(arrPairs) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
- assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
- }
-
- test("zero-length partitions should be correctly handled") {
- // Create RDD with some consecutive empty partitions (including the "first" one)
- val rdd: RDD[Double] = sc
- .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
- .filter(_ >= 0.0)
-
- // Run the partitions, including the consecutive empty ones, through StatCounter
- val stats: StatCounter = rdd.stats();
- assert(abs(6.0 - stats.sum) < 0.01);
- assert(abs(6.0/2 - rdd.mean) < 0.01);
- assert(abs(1.0 - rdd.variance) < 0.01);
- assert(abs(1.0 - rdd.stdev) < 0.01);
-
- // Add other tests here for classes that should be able to handle empty partitions correctly
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/PipedRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
deleted file mode 100644
index 35c0471..0000000
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import SparkContext._
-
-class PipedRDDSuite extends FunSuite with SharedSparkContext {
-
- test("basic pipe") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-
- val piped = nums.pipe(Seq("cat"))
-
- val c = piped.collect()
- assert(c.size === 4)
- assert(c(0) === "1")
- assert(c(1) === "2")
- assert(c(2) === "3")
- assert(c(3) === "4")
- }
-
- test("advanced pipe") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val bl = sc.broadcast(List("0"))
-
- val piped = nums.pipe(Seq("cat"),
- Map[String, String](),
- (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
- (i:Int, f: String=> Unit) => f(i + "_"))
-
- val c = piped.collect()
-
- assert(c.size === 8)
- assert(c(0) === "0")
- assert(c(1) === "\u0001")
- assert(c(2) === "1_")
- assert(c(3) === "2_")
- assert(c(4) === "0")
- assert(c(5) === "\u0001")
- assert(c(6) === "3_")
- assert(c(7) === "4_")
-
- val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
- val d = nums1.groupBy(str=>str.split("\t")(0)).
- pipe(Seq("cat"),
- Map[String, String](),
- (f: String => Unit) => {bl.value.map(f(_));f("\u0001")},
- (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
- assert(d.size === 8)
- assert(d(0) === "0")
- assert(d(1) === "\u0001")
- assert(d(2) === "b\t2_")
- assert(d(3) === "b\t4_")
- assert(d(4) === "0")
- assert(d(5) === "\u0001")
- assert(d(6) === "a\t1_")
- assert(d(7) === "a\t3_")
- }
-
- test("pipe with env variable") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
- val c = piped.collect()
- assert(c.size === 2)
- assert(c(0) === "LALALA")
- assert(c(1) === "LALALA")
- }
-
- test("pipe with non-zero exit status") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe(Seq("cat nonexistent_file", "2>", "/dev/null"))
- intercept[SparkException] {
- piped.collect()
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
deleted file mode 100644
index e306952..0000000
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ /dev/null
@@ -1,389 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.mutable.HashMap
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
-import spark.rdd._
-import scala.collection.parallel.mutable
-
-class RDDSuite extends FunSuite with SharedSparkContext {
-
- test("basic operations") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- assert(nums.collect().toList === List(1, 2, 3, 4))
- val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
- assert(dups.distinct().count() === 4)
- assert(dups.distinct.count === 4) // Can distinct and count be called without parentheses?
- assert(dups.distinct.collect === dups.distinct().collect)
- assert(dups.distinct(2).collect === dups.distinct().collect)
- assert(nums.reduce(_ + _) === 10)
- assert(nums.fold(0)(_ + _) === 10)
- assert(nums.map(_.toString).collect().toList === List("1", "2", "3", "4"))
- assert(nums.filter(_ > 2).collect().toList === List(3, 4))
- assert(nums.flatMap(x => 1 to x).collect().toList === List(1, 1, 2, 1, 2, 3, 1, 2, 3, 4))
- assert(nums.union(nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
- assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
- assert(nums.collect({ case i if i >= 3 => i.toString }).collect().toList === List("3", "4"))
- assert(nums.keyBy(_.toString).collect().toList === List(("1", 1), ("2", 2), ("3", 3), ("4", 4)))
- val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
- assert(partitionSums.collect().toList === List(3, 7))
-
- val partitionSumsWithSplit = nums.mapPartitionsWithSplit {
- case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
- }
- assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
-
- val partitionSumsWithIndex = nums.mapPartitionsWithIndex {
- case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
- }
- assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7)))
-
- intercept[UnsupportedOperationException] {
- nums.filter(_ > 5).reduce(_ + _)
- }
- }
-
- test("SparkContext.union") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
- assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
- assert(sc.union(Seq(nums)).collect().toList === List(1, 2, 3, 4))
- assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
- }
-
- test("aggregate") {
- val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
- type StringMap = HashMap[String, Int]
- val emptyMap = new StringMap {
- override def default(key: String): Int = 0
- }
- val mergeElement: (StringMap, (String, Int)) => StringMap = (map, pair) => {
- map(pair._1) += pair._2
- map
- }
- val mergeMaps: (StringMap, StringMap) => StringMap = (map1, map2) => {
- for ((key, value) <- map2) {
- map1(key) += value
- }
- map1
- }
- val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
- assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
- }
-
- test("basic caching") {
- val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
- assert(rdd.collect().toList === List(1, 2, 3, 4))
- assert(rdd.collect().toList === List(1, 2, 3, 4))
- assert(rdd.collect().toList === List(1, 2, 3, 4))
- }
-
- test("caching with failures") {
- val onlySplit = new Partition { override def index: Int = 0 }
- var shouldFail = true
- val rdd = new RDD[Int](sc, Nil) {
- override def getPartitions: Array[Partition] = Array(onlySplit)
- override val getDependencies = List[Dependency[_]]()
- override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
- if (shouldFail) {
- throw new Exception("injected failure")
- } else {
- return Array(1, 2, 3, 4).iterator
- }
- }
- }.cache()
- val thrown = intercept[Exception]{
- rdd.collect()
- }
- assert(thrown.getMessage.contains("injected failure"))
- shouldFail = false
- assert(rdd.collect().toList === List(1, 2, 3, 4))
- }
-
- test("empty RDD") {
- val empty = new EmptyRDD[Int](sc)
- assert(empty.count === 0)
- assert(empty.collect().size === 0)
-
- val thrown = intercept[UnsupportedOperationException]{
- empty.reduce(_+_)
- }
- assert(thrown.getMessage.contains("empty"))
-
- val emptyKv = new EmptyRDD[(Int, Int)](sc)
- val rdd = sc.parallelize(1 to 2, 2).map(x => (x, x))
- assert(rdd.join(emptyKv).collect().size === 0)
- assert(rdd.rightOuterJoin(emptyKv).collect().size === 0)
- assert(rdd.leftOuterJoin(emptyKv).collect().size === 2)
- assert(rdd.cogroup(emptyKv).collect().size === 2)
- assert(rdd.union(emptyKv).collect().size === 2)
- }
-
- test("cogrouped RDDs") {
- val data = sc.parallelize(1 to 10, 10)
-
- val coalesced1 = data.coalesce(2)
- assert(coalesced1.collect().toList === (1 to 10).toList)
- assert(coalesced1.glom().collect().map(_.toList).toList ===
- List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
-
- // Check that the narrow dependency is also specified correctly
- assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(0).toList ===
- List(0, 1, 2, 3, 4))
- assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
- List(5, 6, 7, 8, 9))
-
- val coalesced2 = data.coalesce(3)
- assert(coalesced2.collect().toList === (1 to 10).toList)
- assert(coalesced2.glom().collect().map(_.toList).toList ===
- List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
-
- val coalesced3 = data.coalesce(10)
- assert(coalesced3.collect().toList === (1 to 10).toList)
- assert(coalesced3.glom().collect().map(_.toList).toList ===
- (1 to 10).map(x => List(x)).toList)
-
- // If we try to coalesce into more partitions than the original RDD, it should just
- // keep the original number of partitions.
- val coalesced4 = data.coalesce(20)
- assert(coalesced4.collect().toList === (1 to 10).toList)
- assert(coalesced4.glom().collect().map(_.toList).toList ===
- (1 to 10).map(x => List(x)).toList)
-
- // we can optionally shuffle to keep the upstream parallel
- val coalesced5 = data.coalesce(1, shuffle = true)
- assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
- null)
- }
- test("cogrouped RDDs with locality") {
- val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
- val coal3 = data3.coalesce(3)
- val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
- assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")
-
- // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
- val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i+2)).map{ j => "m" + (j%6)})))
- val coalesced1 = data.coalesce(3)
- assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
-
- val splits = coalesced1.glom().collect().map(_.toList).toList
- assert(splits.length === 3, "Supposed to coalesce to 3 but got " + splits.length)
-
- assert(splits.forall(_.length >= 1) === true, "Some partitions were empty")
-
- // If we try to coalesce into more partitions than the original RDD, it should just
- // keep the original number of partitions.
- val coalesced4 = data.coalesce(20)
- val listOfLists = coalesced4.glom().collect().map(_.toList).toList
- val sortedList = listOfLists.sortWith{ (x, y) => !x.isEmpty && (y.isEmpty || (x(0) < y(0))) }
- assert( sortedList === (1 to 9).
- map{x => List(x)}.toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
- }
-
- test("cogrouped RDDs with locality, large scale (10K partitions)") {
- // large scale experiment
- import collection.mutable
- val rnd = scala.util.Random
- val partitions = 10000
- val numMachines = 50
- val machines = mutable.ListBuffer[String]()
- (1 to numMachines).foreach(machines += "m"+_)
-
- val blocks = (1 to partitions).map(i =>
- { (i, Array.fill(3)(machines(rnd.nextInt(machines.size))).toList) } )
-
- val data2 = sc.makeRDD(blocks)
- val coalesced2 = data2.coalesce(numMachines*2)
-
- // test that you get over 90% locality in each group
- val minLocality = coalesced2.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
- assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
-
- // test that the groups are load balanced with 100 +/- 20 elements in each
- val maxImbalance = coalesced2.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].parents.size)
- .foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
- assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
-
- val data3 = sc.makeRDD(blocks).map(i => i*2) // derived RDD to test *current* pref locs
- val coalesced3 = data3.coalesce(numMachines*2)
- val minLocality2 = coalesced3.partitions
- .map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
- .foldLeft(1.)((perc, loc) => math.min(perc,loc))
- assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
- (minLocality2*100.).toInt + "%")
- }
-
- test("zipped RDDs") {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val zipped = nums.zip(nums.map(_ + 1.0))
- assert(zipped.glom().map(_.toList).collect().toList ===
- List(List((1, 2.0), (2, 3.0)), List((3, 4.0), (4, 5.0))))
-
- intercept[IllegalArgumentException] {
- nums.zip(sc.parallelize(1 to 4, 1)).collect()
- }
- }
-
- test("partition pruning") {
- val data = sc.parallelize(1 to 10, 10)
- // Note that split number starts from 0, so > 8 means only 10th partition left.
- val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
- assert(prunedRdd.partitions.size === 1)
- val prunedData = prunedRdd.collect()
- assert(prunedData.size === 1)
- assert(prunedData(0) === 10)
- }
-
- test("mapWith") {
- import java.util.Random
- val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
- val randoms = ones.mapWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) => prng.nextDouble * t}.collect()
- val prn42_3 = {
- val prng42 = new Random(42)
- prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
- }
- val prn43_3 = {
- val prng43 = new Random(43)
- prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
- }
- assert(randoms(2) === prn42_3)
- assert(randoms(5) === prn43_3)
- }
-
- test("flatMapWith") {
- import java.util.Random
- val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
- val randoms = ones.flatMapWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) =>
- val random = prng.nextDouble()
- Seq(random * t, random * t * 10)}.
- collect()
- val prn42_3 = {
- val prng42 = new Random(42)
- prng42.nextDouble(); prng42.nextDouble(); prng42.nextDouble()
- }
- val prn43_3 = {
- val prng43 = new Random(43)
- prng43.nextDouble(); prng43.nextDouble(); prng43.nextDouble()
- }
- assert(randoms(5) === prn42_3 * 10)
- assert(randoms(11) === prn43_3 * 10)
- }
-
- test("filterWith") {
- import java.util.Random
- val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
- val sample = ints.filterWith(
- (index: Int) => new Random(index + 42))
- {(t: Int, prng: Random) => prng.nextInt(3) == 0}.
- collect()
- val checkSample = {
- val prng42 = new Random(42)
- val prng43 = new Random(43)
- Array(1, 2, 3, 4, 5, 6).filter{i =>
- if (i < 4) 0 == prng42.nextInt(3)
- else 0 == prng43.nextInt(3)}
- }
- assert(sample.size === checkSample.size)
- for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
- }
-
- test("top with predefined ordering") {
- val nums = Array.range(1, 100000)
- val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
- val topK = ints.top(5)
- assert(topK.size === 5)
- assert(topK === nums.reverse.take(5))
- }
-
- test("top with custom ordering") {
- val words = Vector("a", "b", "c", "d")
- implicit val ord = implicitly[Ordering[String]].reverse
- val rdd = sc.makeRDD(words, 2)
- val topK = rdd.top(2)
- assert(topK.size === 2)
- assert(topK.sorted === Array("b", "a"))
- }
-
- test("takeOrdered with predefined ordering") {
- val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- val rdd = sc.makeRDD(nums, 2)
- val sortedLowerK = rdd.takeOrdered(5)
- assert(sortedLowerK.size === 5)
- assert(sortedLowerK === Array(1, 2, 3, 4, 5))
- }
-
- test("takeOrdered with custom ordering") {
- val nums = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- implicit val ord = implicitly[Ordering[Int]].reverse
- val rdd = sc.makeRDD(nums, 2)
- val sortedTopK = rdd.takeOrdered(5)
- assert(sortedTopK.size === 5)
- assert(sortedTopK === Array(10, 9, 8, 7, 6))
- assert(sortedTopK === nums.sorted(ord).take(5))
- }
-
- test("takeSample") {
- val data = sc.parallelize(1 to 100, 2)
- for (seed <- 1 to 5) {
- val sample = data.takeSample(withReplacement=false, 20, seed)
- assert(sample.size === 20) // Got exactly 20 elements
- assert(sample.toSet.size === 20) // Elements are distinct
- assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
- }
- for (seed <- 1 to 5) {
- val sample = data.takeSample(withReplacement=false, 200, seed)
- assert(sample.size === 100) // Got only 100 elements
- assert(sample.toSet.size === 100) // Elements are distinct
- assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
- }
- for (seed <- 1 to 5) {
- val sample = data.takeSample(withReplacement=true, 20, seed)
- assert(sample.size === 20) // Got exactly 20 elements
- assert(sample.forall(x => 1 <= x && x <= 100), "elements not in [1, 100]")
- }
- for (seed <- 1 to 5) {
- val sample = data.takeSample(withReplacement=true, 100, seed)
- assert(sample.size === 100) // Got exactly 100 elements
- // Chance of getting all distinct elements is astronomically low, so test we got < 100
- assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
- }
- for (seed <- 1 to 5) {
- val sample = data.takeSample(withReplacement=true, 200, seed)
- assert(sample.size === 200) // Got exactly 200 elements
- // Chance of getting all distinct elements is still quite low, so test we got < 100
- assert(sample.toSet.size < 100, "sampling with replacement returned all distinct elements")
- }
- }
-
- test("runJob on an invalid partition") {
- intercept[IllegalArgumentException] {
- sc.runJob(sc.parallelize(1 to 10, 2), {iter: Iterator[Int] => iter.size}, Seq(0, 1, 2), false)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SharedSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala
deleted file mode 100644
index 70c2451..0000000
--- a/core/src/test/scala/spark/SharedSparkContext.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.Suite
-import org.scalatest.BeforeAndAfterAll
-
-/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
-trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
-
- @transient private var _sc: SparkContext = _
-
- def sc: SparkContext = _sc
-
- override def beforeAll() {
- _sc = new SparkContext("local", "test")
- super.beforeAll()
- }
-
- override def afterAll() {
- if (_sc != null) {
- LocalSparkContext.stop(_sc)
- _sc = null
- }
- super.afterAll()
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ShuffleNettySuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ShuffleNettySuite.scala b/core/src/test/scala/spark/ShuffleNettySuite.scala
deleted file mode 100644
index 6bad6c1..0000000
--- a/core/src/test/scala/spark/ShuffleNettySuite.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.BeforeAndAfterAll
-
-
-class ShuffleNettySuite extends ShuffleSuite with BeforeAndAfterAll {
-
- // This test suite should run all tests in ShuffleSuite with Netty shuffle mode.
-
- override def beforeAll(configMap: Map[String, Any]) {
- System.setProperty("spark.shuffle.use.netty", "true")
- }
-
- override def afterAll(configMap: Map[String, Any]) {
- System.setProperty("spark.shuffle.use.netty", "false")
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
deleted file mode 100644
index 8745689..0000000
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.matchers.ShouldMatchers
-
-import spark.SparkContext._
-import spark.ShuffleSuite.NonJavaSerializableClass
-import spark.rdd.{SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
-import spark.util.MutablePair
-
-
-class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
- test("groupByKey without compression") {
- try {
- System.setProperty("spark.shuffle.compress", "false")
- sc = new SparkContext("local", "test")
- val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
- val groups = pairs.groupByKey(4).collect()
- assert(groups.size === 2)
- val valuesFor1 = groups.find(_._1 == 1).get._2
- assert(valuesFor1.toList.sorted === List(1, 2, 3))
- val valuesFor2 = groups.find(_._1 == 2).get._2
- assert(valuesFor2.toList.sorted === List(1))
- } finally {
- System.setProperty("spark.shuffle.compress", "true")
- }
- }
-
- test("shuffle non-zero block size") {
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- val NUM_BLOCKS = 3
-
- val a = sc.parallelize(1 to 10, 2)
- val b = a.map { x =>
- (x, new NonJavaSerializableClass(x * 2))
- }
- // If the Kryo serializer is not used correctly, the shuffle would fail because the
- // default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[spark.KryoSerializer].getName)
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
-
- assert(c.count === 10)
-
- // All blocks must have non-zero size
- (0 until NUM_BLOCKS).foreach { id =>
- val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
- assert(statuses.forall(s => s._2 > 0))
- }
- }
-
- test("shuffle serializer") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- val a = sc.parallelize(1 to 10, 2)
- val b = a.map { x =>
- (x, new NonJavaSerializableClass(x * 2))
- }
- // If the Kryo serializer is not used correctly, the shuffle would fail because the
- // default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(3)).setSerializer(classOf[spark.KryoSerializer].getName)
- assert(c.count === 10)
- }
-
- test("zero sized blocks") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
-
- // 10 partitions from 4 keys
- val NUM_BLOCKS = 10
- val a = sc.parallelize(1 to 4, NUM_BLOCKS)
- val b = a.map(x => (x, x*2))
-
- // NOTE: The default Java serializer doesn't create zero-sized blocks.
- // So, use Kryo
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
- .setSerializer(classOf[spark.KryoSerializer].getName)
-
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
- assert(c.count === 4)
-
- val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
- val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
- statuses.map(x => x._2)
- }
- val nonEmptyBlocks = blockSizes.filter(x => x > 0)
-
- // We should have at most 4 non-zero sized partitions
- assert(nonEmptyBlocks.size <= 4)
- }
-
- test("zero sized blocks without kryo") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
-
- // 10 partitions from 4 keys
- val NUM_BLOCKS = 10
- val a = sc.parallelize(1 to 4, NUM_BLOCKS)
- val b = a.map(x => (x, x*2))
-
- // NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
-
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
- assert(c.count === 4)
-
- val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
- val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, id)
- statuses.map(x => x._2)
- }
- val nonEmptyBlocks = blockSizes.filter(x => x > 0)
-
- // We should have at most 4 non-zero sized partitions
- assert(nonEmptyBlocks.size <= 4)
- }
-
- test("shuffle using mutable pairs") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
- val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
- val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
- val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
- .collect()
-
- data.foreach { pair => results should contain (pair) }
- }
-
- test("sorting using mutable pairs") {
- // This is not in SortingSuite because of the local cluster setup.
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
- val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
- val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
- val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
- .sortByKey().collect()
- results(0) should be (p(1, 11))
- results(1) should be (p(2, 22))
- results(2) should be (p(3, 33))
- results(3) should be (p(100, 100))
- }
-
- test("cogroup using mutable pairs") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
- val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
- val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
- val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
- val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
- val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
-
- assert(results(1)(0).length === 3)
- assert(results(1)(0).contains(1))
- assert(results(1)(0).contains(2))
- assert(results(1)(0).contains(3))
- assert(results(1)(1).length === 2)
- assert(results(1)(1).contains("11"))
- assert(results(1)(1).contains("12"))
- assert(results(2)(0).length === 1)
- assert(results(2)(0).contains(1))
- assert(results(2)(1).length === 1)
- assert(results(2)(1).contains("22"))
- assert(results(3)(0).length === 0)
- assert(results(3)(1).contains("3"))
- }
-
- test("subtract mutable pairs") {
- // Use a local cluster with 2 processes to make sure there are both local and remote blocks
- sc = new SparkContext("local-cluster[2,1,512]", "test")
- def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
- val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
- val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
- val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
- val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
- val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
- results should have length (1)
- // substracted rdd return results as Tuple2
- results(0) should be ((3, 33))
- }
-}
-
-object ShuffleSuite {
-
- def mergeCombineException(x: Int, y: Int): Int = {
- throw new SparkException("Exception for map-side combine.")
- x + y
- }
-
- class NonJavaSerializableClass(val value: Int)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SizeEstimatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SizeEstimatorSuite.scala b/core/src/test/scala/spark/SizeEstimatorSuite.scala
deleted file mode 100644
index 1ef812d..0000000
--- a/core/src/test/scala/spark/SizeEstimatorSuite.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfterAll
-import org.scalatest.PrivateMethodTester
-
-class DummyClass1 {}
-
-class DummyClass2 {
- val x: Int = 0
-}
-
-class DummyClass3 {
- val x: Int = 0
- val y: Double = 0.0
-}
-
-class DummyClass4(val d: DummyClass3) {
- val x: Int = 0
-}
-
-object DummyString {
- def apply(str: String) : DummyString = new DummyString(str.toArray)
-}
-class DummyString(val arr: Array[Char]) {
- override val hashCode: Int = 0
- // JDK-7 has an extra hash32 field http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/rev/11987e85555f
- @transient val hash32: Int = 0
-}
-
-class SizeEstimatorSuite
- extends FunSuite with BeforeAndAfterAll with PrivateMethodTester {
-
- var oldArch: String = _
- var oldOops: String = _
-
- override def beforeAll() {
- // Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
- oldArch = System.setProperty("os.arch", "amd64")
- oldOops = System.setProperty("spark.test.useCompressedOops", "true")
- }
-
- override def afterAll() {
- resetOrClear("os.arch", oldArch)
- resetOrClear("spark.test.useCompressedOops", oldOops)
- }
-
- test("simple classes") {
- assert(SizeEstimator.estimate(new DummyClass1) === 16)
- assert(SizeEstimator.estimate(new DummyClass2) === 16)
- assert(SizeEstimator.estimate(new DummyClass3) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(null)) === 24)
- assert(SizeEstimator.estimate(new DummyClass4(new DummyClass3)) === 48)
- }
-
- // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
- // (Sun vs IBM). Use a DummyString class to make tests deterministic.
- test("strings") {
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
- }
-
- test("primitive arrays") {
- assert(SizeEstimator.estimate(new Array[Byte](10)) === 32)
- assert(SizeEstimator.estimate(new Array[Char](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Short](10)) === 40)
- assert(SizeEstimator.estimate(new Array[Int](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Long](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Float](10)) === 56)
- assert(SizeEstimator.estimate(new Array[Double](10)) === 96)
- assert(SizeEstimator.estimate(new Array[Int](1000)) === 4016)
- assert(SizeEstimator.estimate(new Array[Long](1000)) === 8016)
- }
-
- test("object arrays") {
- // Arrays containing nulls should just have one pointer per element
- assert(SizeEstimator.estimate(new Array[String](10)) === 56)
- assert(SizeEstimator.estimate(new Array[AnyRef](10)) === 56)
-
- // For object arrays with non-null elements, each object should take one pointer plus
- // however many bytes that class takes. (Note that Array.fill calls the code in its
- // second parameter separately for each object, so we get distinct objects.)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass1)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass2)) === 216)
- assert(SizeEstimator.estimate(Array.fill(10)(new DummyClass3)) === 296)
- assert(SizeEstimator.estimate(Array(new DummyClass1, new DummyClass2)) === 56)
-
- // Past size 100, our samples 100 elements, but we should still get the right size.
- assert(SizeEstimator.estimate(Array.fill(1000)(new DummyClass3)) === 28016)
-
- // If an array contains the *same* element many times, we should only count it once.
- val d1 = new DummyClass1
- assert(SizeEstimator.estimate(Array.fill(10)(d1)) === 72) // 10 pointers plus 8-byte object
- assert(SizeEstimator.estimate(Array.fill(100)(d1)) === 432) // 100 pointers plus 8-byte object
-
- // Same thing with huge array containing the same element many times. Note that this won't
- // return exactly 4032 because it can't tell that *all* the elements will equal the first
- // one it samples, but it should be close to that.
-
- // TODO: If we sample 100 elements, this should always be 4176 ?
- val estimatedSize = SizeEstimator.estimate(Array.fill(1000)(d1))
- assert(estimatedSize >= 4000, "Estimated size " + estimatedSize + " should be more than 4000")
- assert(estimatedSize <= 4200, "Estimated size " + estimatedSize + " should be less than 4100")
- }
-
- test("32-bit arch") {
- val arch = System.setProperty("os.arch", "x86")
-
- val initialize = PrivateMethod[Unit]('initialize)
- SizeEstimator invokePrivate initialize()
-
- assert(SizeEstimator.estimate(DummyString("")) === 40)
- assert(SizeEstimator.estimate(DummyString("a")) === 48)
- assert(SizeEstimator.estimate(DummyString("ab")) === 48)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 56)
-
- resetOrClear("os.arch", arch)
- }
-
- // NOTE: The String class definition varies across JDK versions (1.6 vs. 1.7) and vendors
- // (Sun vs IBM). Use a DummyString class to make tests deterministic.
- test("64-bit arch with no compressed oops") {
- val arch = System.setProperty("os.arch", "amd64")
- val oops = System.setProperty("spark.test.useCompressedOops", "false")
-
- val initialize = PrivateMethod[Unit]('initialize)
- SizeEstimator invokePrivate initialize()
-
- assert(SizeEstimator.estimate(DummyString("")) === 56)
- assert(SizeEstimator.estimate(DummyString("a")) === 64)
- assert(SizeEstimator.estimate(DummyString("ab")) === 64)
- assert(SizeEstimator.estimate(DummyString("abcdefgh")) === 72)
-
- resetOrClear("os.arch", arch)
- resetOrClear("spark.test.useCompressedOops", oops)
- }
-
- def resetOrClear(prop: String, oldValue: String) {
- if (oldValue != null) {
- System.setProperty(prop, oldValue)
- } else {
- System.clearProperty(prop)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SortingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
deleted file mode 100644
index b933c4a..0000000
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-import org.scalatest.matchers.ShouldMatchers
-import SparkContext._
-
-class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
-
- test("sortByKey") {
- val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
- assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
- }
-
- test("large array") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 2)
- val sorted = pairs.sortByKey()
- assert(sorted.partitions.size === 2)
- assert(sorted.collect() === pairArr.sortBy(_._1))
- }
-
- test("large array with one split") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 2)
- val sorted = pairs.sortByKey(true, 1)
- assert(sorted.partitions.size === 1)
- assert(sorted.collect() === pairArr.sortBy(_._1))
- }
-
- test("large array with many partitions") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 2)
- val sorted = pairs.sortByKey(true, 20)
- assert(sorted.partitions.size === 20)
- assert(sorted.collect() === pairArr.sortBy(_._1))
- }
-
- test("sort descending") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 2)
- assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
- }
-
- test("sort descending with one split") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 1)
- assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
- }
-
- test("sort descending with many partitions") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 2)
- assert(pairs.sortByKey(false, 20).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
- }
-
- test("more partitions than elements") {
- val rand = new scala.util.Random()
- val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
- val pairs = sc.parallelize(pairArr, 30)
- assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
- }
-
- test("empty RDD") {
- val pairArr = new Array[(Int, Int)](0)
- val pairs = sc.parallelize(pairArr, 2)
- assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
- }
-
- test("partition balancing") {
- val pairArr = (1 to 1000).map(x => (x, x)).toArray
- val sorted = sc.parallelize(pairArr, 4).sortByKey()
- assert(sorted.collect() === pairArr.sortBy(_._1))
- val partitions = sorted.collectPartitions()
- logInfo("Partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 180
- partitions(1).length should be > 180
- partitions(2).length should be > 180
- partitions(3).length should be > 180
- partitions(0).last should be < partitions(1).head
- partitions(1).last should be < partitions(2).head
- partitions(2).last should be < partitions(3).head
- }
-
- test("partition balancing for descending sort") {
- val pairArr = (1 to 1000).map(x => (x, x)).toArray
- val sorted = sc.parallelize(pairArr, 4).sortByKey(false)
- assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
- val partitions = sorted.collectPartitions()
- logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
- partitions(0).length should be > 180
- partitions(1).length should be > 180
- partitions(2).length should be > 180
- partitions(3).length should be > 180
- partitions(0).last should be > partitions(1).head
- partitions(1).last should be > partitions(2).head
- partitions(2).last should be > partitions(3).head
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/SparkContextInfoSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/SparkContextInfoSuite.scala b/core/src/test/scala/spark/SparkContextInfoSuite.scala
deleted file mode 100644
index 6d50bf5..0000000
--- a/core/src/test/scala/spark/SparkContextInfoSuite.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import spark.SparkContext._
-
-class SparkContextInfoSuite extends FunSuite with LocalSparkContext {
- test("getPersistentRDDs only returns RDDs that are marked as cached") {
- sc = new SparkContext("local", "test")
- assert(sc.getPersistentRDDs.isEmpty === true)
-
- val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2)
- assert(sc.getPersistentRDDs.isEmpty === true)
-
- rdd.cache()
- assert(sc.getPersistentRDDs.size === 1)
- assert(sc.getPersistentRDDs.values.head === rdd)
- }
-
- test("getPersistentRDDs returns an immutable map") {
- sc = new SparkContext("local", "test")
- val rdd1 = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
- val myRdds = sc.getPersistentRDDs
- assert(myRdds.size === 1)
- assert(myRdds.values.head === rdd1)
-
- val rdd2 = sc.makeRDD(Array(5, 6, 7, 8), 1).cache()
-
- // getPersistentRDDs should have 2 RDDs, but myRdds should not change
- assert(sc.getPersistentRDDs.size === 2)
- assert(myRdds.size === 1)
- }
-
- test("getRDDStorageInfo only reports on RDDs that actually persist data") {
- sc = new SparkContext("local", "test")
- val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
-
- assert(sc.getRDDStorageInfo.size === 0)
-
- rdd.collect()
- assert(sc.getRDDStorageInfo.size === 1)
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ThreadingSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ThreadingSuite.scala b/core/src/test/scala/spark/ThreadingSuite.scala
deleted file mode 100644
index f2acd0b..0000000
--- a/core/src/test/scala/spark/ThreadingSuite.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.util.concurrent.Semaphore
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.scalatest.FunSuite
-import org.scalatest.BeforeAndAfter
-
-import SparkContext._
-
-/**
- * Holds state shared across task threads in some ThreadingSuite tests.
- */
-object ThreadingSuiteState {
- val runningThreads = new AtomicInteger
- val failed = new AtomicBoolean
-
- def clear() {
- runningThreads.set(0)
- failed.set(false)
- }
-}
-
-class ThreadingSuite extends FunSuite with LocalSparkContext {
-
- test("accessing SparkContext form a different thread") {
- sc = new SparkContext("local", "test")
- val nums = sc.parallelize(1 to 10, 2)
- val sem = new Semaphore(0)
- @volatile var answer1: Int = 0
- @volatile var answer2: Int = 0
- new Thread {
- override def run() {
- answer1 = nums.reduce(_ + _)
- answer2 = nums.first() // This will run "locally" in the current thread
- sem.release()
- }
- }.start()
- sem.acquire()
- assert(answer1 === 55)
- assert(answer2 === 1)
- }
-
- test("accessing SparkContext form multiple threads") {
- sc = new SparkContext("local", "test")
- val nums = sc.parallelize(1 to 10, 2)
- val sem = new Semaphore(0)
- @volatile var ok = true
- for (i <- 0 until 10) {
- new Thread {
- override def run() {
- val answer1 = nums.reduce(_ + _)
- if (answer1 != 55) {
- printf("In thread %d: answer1 was %d\n", i, answer1)
- ok = false
- }
- val answer2 = nums.first() // This will run "locally" in the current thread
- if (answer2 != 1) {
- printf("In thread %d: answer2 was %d\n", i, answer2)
- ok = false
- }
- sem.release()
- }
- }.start()
- }
- sem.acquire(10)
- if (!ok) {
- fail("One or more threads got the wrong answer from an RDD operation")
- }
- }
-
- test("accessing multi-threaded SparkContext form multiple threads") {
- sc = new SparkContext("local[4]", "test")
- val nums = sc.parallelize(1 to 10, 2)
- val sem = new Semaphore(0)
- @volatile var ok = true
- for (i <- 0 until 10) {
- new Thread {
- override def run() {
- val answer1 = nums.reduce(_ + _)
- if (answer1 != 55) {
- printf("In thread %d: answer1 was %d\n", i, answer1)
- ok = false
- }
- val answer2 = nums.first() // This will run "locally" in the current thread
- if (answer2 != 1) {
- printf("In thread %d: answer2 was %d\n", i, answer2)
- ok = false
- }
- sem.release()
- }
- }.start()
- }
- sem.acquire(10)
- if (!ok) {
- fail("One or more threads got the wrong answer from an RDD operation")
- }
- }
-
- test("parallel job execution") {
- // This test launches two jobs with two threads each on a 4-core local cluster. Each thread
- // waits until there are 4 threads running at once, to test that both jobs have been launched.
- sc = new SparkContext("local[4]", "test")
- val nums = sc.parallelize(1 to 2, 2)
- val sem = new Semaphore(0)
- ThreadingSuiteState.clear()
- for (i <- 0 until 2) {
- new Thread {
- override def run() {
- val ans = nums.map(number => {
- val running = ThreadingSuiteState.runningThreads
- running.getAndIncrement()
- val time = System.currentTimeMillis()
- while (running.get() != 4 && System.currentTimeMillis() < time + 1000) {
- Thread.sleep(100)
- }
- if (running.get() != 4) {
- println("Waited 1 second without seeing runningThreads = 4 (it was " +
- running.get() + "); failing test")
- ThreadingSuiteState.failed.set(true)
- }
- number
- }).collect()
- assert(ans.toList === List(1, 2))
- sem.release()
- }
- }.start()
- }
- sem.acquire(2)
- if (ThreadingSuiteState.failed.get()) {
- fail("One or more threads didn't see runningThreads = 4")
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/UnpersistSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala
deleted file mode 100644
index 93977d1..0000000
--- a/core/src/test/scala/spark/UnpersistSuite.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Timeouts._
-import org.scalatest.time.{Span, Millis}
-import spark.SparkContext._
-
-class UnpersistSuite extends FunSuite with LocalSparkContext {
- test("unpersist RDD") {
- sc = new SparkContext("local", "test")
- val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
- rdd.count
- assert(sc.persistentRdds.isEmpty === false)
- rdd.unpersist()
- assert(sc.persistentRdds.isEmpty === true)
-
- failAfter(Span(3000, Millis)) {
- try {
- while (! sc.getRDDStorageInfo.isEmpty) {
- Thread.sleep(200)
- }
- } catch {
- case _ => { Thread.sleep(10) }
- // Do nothing. We might see exceptions because block manager
- // is racing this thread to remove entries from the driver.
- }
- }
- assert(sc.getRDDStorageInfo.isEmpty === true)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
deleted file mode 100644
index 98a6c1a..0000000
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import com.google.common.base.Charsets
-import com.google.common.io.Files
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
-import org.scalatest.FunSuite
-import org.apache.commons.io.FileUtils
-import scala.util.Random
-
-class UtilsSuite extends FunSuite {
-
- test("bytesToString") {
- assert(Utils.bytesToString(10) === "10.0 B")
- assert(Utils.bytesToString(1500) === "1500.0 B")
- assert(Utils.bytesToString(2000000) === "1953.1 KB")
- assert(Utils.bytesToString(2097152) === "2.0 MB")
- assert(Utils.bytesToString(2306867) === "2.2 MB")
- assert(Utils.bytesToString(5368709120L) === "5.0 GB")
- assert(Utils.bytesToString(5L * 1024L * 1024L * 1024L * 1024L) === "5.0 TB")
- }
-
- test("copyStream") {
- //input array initialization
- val bytes = Array.ofDim[Byte](9000)
- Random.nextBytes(bytes)
-
- val os = new ByteArrayOutputStream()
- Utils.copyStream(new ByteArrayInputStream(bytes), os)
-
- assert(os.toByteArray.toList.equals(bytes.toList))
- }
-
- test("memoryStringToMb") {
- assert(Utils.memoryStringToMb("1") === 0)
- assert(Utils.memoryStringToMb("1048575") === 0)
- assert(Utils.memoryStringToMb("3145728") === 3)
-
- assert(Utils.memoryStringToMb("1024k") === 1)
- assert(Utils.memoryStringToMb("5000k") === 4)
- assert(Utils.memoryStringToMb("4024k") === Utils.memoryStringToMb("4024K"))
-
- assert(Utils.memoryStringToMb("1024m") === 1024)
- assert(Utils.memoryStringToMb("5000m") === 5000)
- assert(Utils.memoryStringToMb("4024m") === Utils.memoryStringToMb("4024M"))
-
- assert(Utils.memoryStringToMb("2g") === 2048)
- assert(Utils.memoryStringToMb("3g") === Utils.memoryStringToMb("3G"))
-
- assert(Utils.memoryStringToMb("2t") === 2097152)
- assert(Utils.memoryStringToMb("3t") === Utils.memoryStringToMb("3T"))
- }
-
- test("splitCommandString") {
- assert(Utils.splitCommandString("") === Seq())
- assert(Utils.splitCommandString("a") === Seq("a"))
- assert(Utils.splitCommandString("aaa") === Seq("aaa"))
- assert(Utils.splitCommandString("a b c") === Seq("a", "b", "c"))
- assert(Utils.splitCommandString(" a b\t c ") === Seq("a", "b", "c"))
- assert(Utils.splitCommandString("a 'b c'") === Seq("a", "b c"))
- assert(Utils.splitCommandString("a 'b c' d") === Seq("a", "b c", "d"))
- assert(Utils.splitCommandString("'b c'") === Seq("b c"))
- assert(Utils.splitCommandString("a \"b c\"") === Seq("a", "b c"))
- assert(Utils.splitCommandString("a \"b c\" d") === Seq("a", "b c", "d"))
- assert(Utils.splitCommandString("\"b c\"") === Seq("b c"))
- assert(Utils.splitCommandString("a 'b\" c' \"d' e\"") === Seq("a", "b\" c", "d' e"))
- assert(Utils.splitCommandString("a\t'b\nc'\nd") === Seq("a", "b\nc", "d"))
- assert(Utils.splitCommandString("a \"b\\\\c\"") === Seq("a", "b\\c"))
- assert(Utils.splitCommandString("a \"b\\\"c\"") === Seq("a", "b\"c"))
- assert(Utils.splitCommandString("a 'b\\\"c'") === Seq("a", "b\\\"c"))
- assert(Utils.splitCommandString("'a'b") === Seq("ab"))
- assert(Utils.splitCommandString("'a''b'") === Seq("ab"))
- assert(Utils.splitCommandString("\"a\"b") === Seq("ab"))
- assert(Utils.splitCommandString("\"a\"\"b\"") === Seq("ab"))
- assert(Utils.splitCommandString("''") === Seq(""))
- assert(Utils.splitCommandString("\"\"") === Seq(""))
- }
-
- test("string formatting of time durations") {
- val second = 1000
- val minute = second * 60
- val hour = minute * 60
- def str = Utils.msDurationToString(_)
-
- assert(str(123) === "123 ms")
- assert(str(second) === "1.0 s")
- assert(str(second + 462) === "1.5 s")
- assert(str(hour) === "1.00 h")
- assert(str(minute) === "1.0 m")
- assert(str(minute + 4 * second + 34) === "1.1 m")
- assert(str(10 * hour + minute + 4 * second) === "10.02 h")
- assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
- }
-
- test("reading offset bytes of a file") {
- val tmpDir2 = Files.createTempDir()
- val f1Path = tmpDir2 + "/f1"
- val f1 = new FileOutputStream(f1Path)
- f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
- f1.close()
-
- // Read first few bytes
- assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
-
- // Read some middle bytes
- assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
-
- // Read last few bytes
- assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
-
- // Read some nonexistent bytes in the beginning
- assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
-
- // Read some nonexistent bytes at the end
- assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
-
- // Read some nonexistent bytes on both ends
- assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
-
- FileUtils.deleteDirectory(tmpDir2)
- }
-}
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/ZippedPartitionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala
deleted file mode 100644
index bb5d379..0000000
--- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import scala.collection.immutable.NumericRange
-
-import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
-
-import SparkContext._
-
-
-object ZippedPartitionsSuite {
- def procZippedData(i: Iterator[Int], s: Iterator[String], d: Iterator[Double]) : Iterator[Int] = {
- Iterator(i.toArray.size, s.toArray.size, d.toArray.size)
- }
-}
-
-class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
- test("print sizes") {
- val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
- val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
-
- val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)
-
- val obtainedSizes = zippedRDD.collect()
- val expectedSizes = Array(2, 3, 1, 2, 3, 1)
- assert(obtainedSizes.size == 6)
- assert(obtainedSizes.zip(expectedSizes).forall(x => x._1 == x._2))
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/io/CompressionCodecSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/io/CompressionCodecSuite.scala b/core/src/test/scala/spark/io/CompressionCodecSuite.scala
deleted file mode 100644
index 1ba82fe..0000000
--- a/core/src/test/scala/spark/io/CompressionCodecSuite.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.io
-
-import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
-
-import org.scalatest.FunSuite
-
-
-class CompressionCodecSuite extends FunSuite {
-
- def testCodec(codec: CompressionCodec) {
- // Write 1000 integers to the output stream, compressed.
- val outputStream = new ByteArrayOutputStream()
- val out = codec.compressedOutputStream(outputStream)
- for (i <- 1 until 1000) {
- out.write(i % 256)
- }
- out.close()
-
- // Read the 1000 integers back.
- val inputStream = new ByteArrayInputStream(outputStream.toByteArray)
- val in = codec.compressedInputStream(inputStream)
- for (i <- 1 until 1000) {
- assert(in.read() === i % 256)
- }
- in.close()
- }
-
- test("default compression codec") {
- val codec = CompressionCodec.createCodec()
- assert(codec.getClass === classOf[SnappyCompressionCodec])
- testCodec(codec)
- }
-
- test("lzf compression codec") {
- val codec = CompressionCodec.createCodec(classOf[LZFCompressionCodec].getName)
- assert(codec.getClass === classOf[LZFCompressionCodec])
- testCodec(codec)
- }
-
- test("snappy compression codec") {
- val codec = CompressionCodec.createCodec(classOf[SnappyCompressionCodec].getName)
- assert(codec.getClass === classOf[SnappyCompressionCodec])
- testCodec(codec)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
deleted file mode 100644
index b0213b6..0000000
--- a/core/src/test/scala/spark/metrics/MetricsConfigSuite.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
- var filePath: String = _
-
- before {
- filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
- }
-
- test("MetricsConfig with default properties") {
- val conf = new MetricsConfig(Option("dummy-file"))
- conf.initialize()
-
- assert(conf.properties.size() === 5)
- assert(conf.properties.getProperty("test-for-dummy") === null)
-
- val property = conf.getInstance("random")
- assert(property.size() === 3)
- assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
- assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(property.getProperty("sink.servlet.sample") === "false")
- }
-
- test("MetricsConfig with properties set") {
- val conf = new MetricsConfig(Option(filePath))
- conf.initialize()
-
- val masterProp = conf.getInstance("master")
- assert(masterProp.size() === 6)
- assert(masterProp.getProperty("sink.console.period") === "20")
- assert(masterProp.getProperty("sink.console.unit") === "minutes")
- assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
- assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
- assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
- assert(masterProp.getProperty("sink.servlet.sample") === "false")
-
- val workerProp = conf.getInstance("worker")
- assert(workerProp.size() === 6)
- assert(workerProp.getProperty("sink.console.period") === "10")
- assert(workerProp.getProperty("sink.console.unit") === "seconds")
- assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
- assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
- assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
- assert(workerProp.getProperty("sink.servlet.sample") === "false")
- }
-
- test("MetricsConfig with subProperties") {
- val conf = new MetricsConfig(Option(filePath))
- conf.initialize()
-
- val propCategories = conf.propertyCategories
- assert(propCategories.size === 3)
-
- val masterProp = conf.getInstance("master")
- val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
- assert(sourceProps.size === 1)
- assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
-
- val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
- assert(sinkProps.size === 2)
- assert(sinkProps.contains("console"))
- assert(sinkProps.contains("servlet"))
-
- val consoleProps = sinkProps("console")
- assert(consoleProps.size() === 2)
-
- val servletProps = sinkProps("servlet")
- assert(servletProps.size() === 3)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala b/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
deleted file mode 100644
index dc65ac6..0000000
--- a/core/src/test/scala/spark/metrics/MetricsSystemSuite.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.metrics
-
-import org.scalatest.{BeforeAndAfter, FunSuite}
-
-class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
- var filePath: String = _
-
- before {
- filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
- System.setProperty("spark.metrics.conf", filePath)
- }
-
- test("MetricsSystem with default config") {
- val metricsSystem = MetricsSystem.createMetricsSystem("default")
- val sources = metricsSystem.sources
- val sinks = metricsSystem.sinks
-
- assert(sources.length === 0)
- assert(sinks.length === 0)
- assert(!metricsSystem.getServletHandlers.isEmpty)
- }
-
- test("MetricsSystem with sources add") {
- val metricsSystem = MetricsSystem.createMetricsSystem("test")
- val sources = metricsSystem.sources
- val sinks = metricsSystem.sinks
-
- assert(sources.length === 0)
- assert(sinks.length === 1)
- assert(!metricsSystem.getServletHandlers.isEmpty)
-
- val source = new spark.deploy.master.MasterSource(null)
- metricsSystem.registerSource(source)
- assert(sources.length === 1)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala b/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
deleted file mode 100644
index dc8ca94..0000000
--- a/core/src/test/scala/spark/rdd/JdbcRDDSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.scalatest.{ BeforeAndAfter, FunSuite }
-import spark.SparkContext._
-import spark.rdd.JdbcRDD
-import java.sql._
-
-class JdbcRDDSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
-
- before {
- Class.forName("org.apache.derby.jdbc.EmbeddedDriver")
- val conn = DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb;create=true")
- try {
- val create = conn.createStatement
- create.execute("""
- CREATE TABLE FOO(
- ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1),
- DATA INTEGER
- )""")
- create.close
- val insert = conn.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)")
- (1 to 100).foreach { i =>
- insert.setInt(1, i * 2)
- insert.executeUpdate
- }
- insert.close
- } catch {
- case e: SQLException if e.getSQLState == "X0Y32" =>
- // table exists
- } finally {
- conn.close
- }
- }
-
- test("basic functionality") {
- sc = new SparkContext("local", "test")
- val rdd = new JdbcRDD(
- sc,
- () => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
- "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
- 1, 100, 3,
- (r: ResultSet) => { r.getInt(1) } ).cache
-
- assert(rdd.count === 100)
- assert(rdd.reduce(_+_) === 10100)
- }
-
- after {
- try {
- DriverManager.getConnection("jdbc:derby:;shutdown=true")
- } catch {
- case se: SQLException if se.getSQLState == "XJ015" =>
- // normal shutdown
- }
- }
-}