You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by andrewor14 <gi...@git.apache.org> on 2015/10/15 00:30:41 UTC

[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

GitHub user andrewor14 opened a pull request:

    https://github.com/apache/spark/pull/9124

    [SPARK-11078] Ensure spilling tests actually spill

    #9084 uncovered that many tests that test spilling don't actually spill. This is a follow-up patch to fix that to ensure our unit tests actually catch potential bugs in spilling. The size of this patch is inflated by the refactoring of `ExternalSorterSuite`, which had a lot of duplicate code and logic.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/andrewor14/spark spilling-tests

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/9124.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #9124
    
----
commit fd34c25d7ab6d3506d42b2f0cd8c8f26b7726e50
Author: Andrew Or <an...@databricks.com>
Date:   2015-10-14T20:05:42Z

    Fix and clean up ExternalSorterSuite
    
    This commit does several things:
    - remove noisy warning in GrantEverythingMemoryManager
    - remove duplciate code in ExternalSorterSuite
    - add a force spill threshold to make it easier to verify spilling
    - ensure spilling tests actually spill in ExternalSorterSuite

commit 285c81c26b210bcbe88f055475b2a53ac61bb5c1
Author: Andrew Or <an...@databricks.com>
Date:   2015-10-14T21:33:51Z

    Fix spilling tests in ExternalAppendOnlyMapSuite

commit 7226933d2a40896b9c4d606eb2c0ab6437507431
Author: Andrew Or <an...@databricks.com>
Date:   2015-10-14T22:25:15Z

    Fix DistributedSuite

commit 1b7fa3d6d32b1e254a47706db7eccd915c7368aa
Author: Andrew Or <an...@databricks.com>
Date:   2015-10-14T22:28:25Z

    Merge branch 'master' of github.com:apache/spark into spilling-tests

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148282338
  
      [Test build #1904 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1904/console) for   PR 9124 at commit [`1b7fa3d`](https://github.com/apache/spark/commit/1b7fa3d6d32b1e254a47706db7eccd915c7368aa).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #9124: [SPARK-11078] Ensure spilling tests actually spill

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/9124
  
    @andrewor14 looks like we still have some failures of the form...
    
    ```
    - spilling with compression *** FAILED ***
      java.lang.Exception: Test failed with compression using codec org.apache.spark.io.LZ4CompressionCodec:
    
    assertion failed: expected groupByKey to spill, but did not
      at scala.Predef$.assert(Predef.scala:170)
      at org.apache.spark.TestUtils$.assertSpilled(TestUtils.scala:170)
      at org.apache.spark.util.collection.ExternalAppendOnlyMapSuite.org$apache$spark$util$collection$ExternalAppendOnlyMapSuite$$testSimpleSpilling(ExternalAppendOnlyMapSuite.scala:253)
      at org.apache.spark.util.collection.ExternalAppendOnlyMapSuite$$anonfun$10$$anonfun$apply$mcV$sp$8.apply(ExternalAppendOnlyMapSuite.scala:218)
      at org.apache.spark.util.collection.ExternalAppendOnlyMapSuite$$anonfun$10$$anonfun$apply$mcV$sp$8.apply(ExternalAppendOnlyMapSuite.scala:216)
      at scala.collection.immutable.Stream.foreach(Stream.scala:594)
      ...
    ```
    
    Do you think it's possible that somehow there's a race here in reporting the metrics? so that things are spilling but not reported to the listener by the time the number of spilled tasks is checked?
    
    I could try putting in an `eventually` to let it check repeatedly for a short time? any better ideas, or reasons to think that's not it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148562385
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9124#discussion_r42182568
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala ---
    @@ -18,535 +18,91 @@
     package org.apache.spark.util.collection
     
     import scala.collection.mutable.ArrayBuffer
    -
     import scala.util.Random
     
     import org.apache.spark._
     import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
     
    -// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
     
     class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
    -  private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
    -    val conf = new SparkConf(loadDefaults)
    -    if (kryo) {
    -      conf.set("spark.serializer", classOf[KryoSerializer].getName)
    -    } else {
    -      // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    -      // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    -      conf.set("spark.serializer.objectStreamReset", "1")
    -      conf.set("spark.serializer", classOf[JavaSerializer].getName)
    -    }
    -    conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
    -    // Ensure that we actually have multiple batches per spill file
    -    conf.set("spark.shuffle.spill.batchSize", "10")
    -    conf.set("spark.testing.memory", "2000000")
    -    conf
    -  }
    -
    -  test("empty data stream with kryo ser") {
    -    emptyDataStream(createSparkConf(false, true))
    -  }
    -
    -  test("empty data stream with java ser") {
    -    emptyDataStream(createSparkConf(false, false))
    -  }
    -
    -  def emptyDataStream(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter.iterator.toSeq === Seq())
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), None, None)
    -    assert(sorter2.iterator.toSeq === Seq())
    -    sorter2.stop()
    -
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter3.iterator.toSeq === Seq())
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), None, None)
    -    assert(sorter4.iterator.toSeq === Seq())
    -    sorter4.stop()
    -  }
    +  import TestUtils.{assertNotSpilled, assertSpilled}
     
    -  test("few elements per partition with kryo ser") {
    -    fewElementsPerPartition(createSparkConf(false, true))
    -  }
    +  testWithMultipleSer("empty data stream")(emptyDataStream)
     
    -  test("few elements per partition with java ser") {
    -    fewElementsPerPartition(createSparkConf(false, false))
    -  }
    +  testWithMultipleSer("few elements per partition")(fewElementsPerPartition)
     
    -  def fewElementsPerPartition(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Set((1, 1), (2, 2), (5, 5))
    -    val expected = Set(
    -      (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()),
    -      (5, Set((5, 5))), (6, Set()))
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements.iterator)
    -    assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), None, None)
    -    sorter2.insertAll(elements.iterator)
    -    assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter2.stop()
    +  testWithMultipleSer("empty partitions with spilling")(emptyPartitionsWithSpilling)
     
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter3.insertAll(elements.iterator)
    -    assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), None, None)
    -    sorter4.insertAll(elements.iterator)
    -    assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter4.stop()
    -  }
    -
    -  test("empty partitions with spilling with kryo ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, true))
    +  // Load defaults, otherwise SPARK_HOME is not found
    +  testWithMultipleSer("spilling in local cluster", loadDefaults = true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 2)
       }
     
    -  test("empty partitions with spilling with java ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, false))
    -  }
    -
    -  def emptyPartitionsWithSpilling(conf: SparkConf) {
    -    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements)
    -    assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
    -    val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
    -    assert(iter.next() === (0, Nil))
    -    assert(iter.next() === (1, List((1, 1))))
    -    assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList))
    -    assert(iter.next() === (3, Nil))
    -    assert(iter.next() === (4, Nil))
    -    assert(iter.next() === (5, List((5, 5))))
    -    assert(iter.next() === (6, Nil))
    -    sorter.stop()
    -  }
    -
    -  test("spilling in local cluster with kryo ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with java ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, false))
    -  }
    -
    -  def testSpillingInLocalCluster(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~8 times
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~17 times
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey().collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~7 times
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~7 times
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~17 times
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with kryo ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with java ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, false))
    -  }
    -
    -  def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~4 times per executor
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max _, 100).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~8 times per executor
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey(100).collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~4 times per executor
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2, 100).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~4 times per executor
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~8 times per executor
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    +  testWithMultipleSer("spilling in local cluster with many reduce tasks", loadDefaults = true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 100)
       }
     
       test("cleanup of intermediate files in sorter") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    -
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
    -    sorter2.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +    cleanupIntermediateFilesInSorter(withFailures = false)
       }
     
    -  test("cleanup of intermediate files in sorter if there are errors") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    intercept[SparkException] {
    -      sorter.insertAll((0 until 120000).iterator.map(i => {
    -        if (i == 119990) {
    -          throw new SparkException("Intentional failure")
    -        }
    -        (i, i)
    -      }))
    -    }
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +  test("cleanup of intermediate files in sorter with failures") {
    +    cleanupIntermediateFilesInSorter(withFailures = true)
       }
     
       test("cleanup of intermediate files in shuffle") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => (i, i))
    -    assert(data.reduceByKey(_ + _).count() === 100000)
    -
    -    // After the shuffle, there should be only 4 files on disk: our two map output files and
    -    // their index files. All other intermediate files should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 4)
    -  }
    -
    -  test("cleanup of intermediate files in shuffle with errors") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => {
    -      if (i == 99990) {
    -        throw new Exception("Intentional failure")
    -      }
    -      (i, i)
    -    })
    -    intercept[SparkException] {
    -      data.reduceByKey(_ + _).count()
    -    }
    -
    -    // After the shuffle, there should be only 2 files on disk: the output of task 1 and its index.
    -    // All other files (map 2's output and intermediate merge files) should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 2)
    -  }
    -
    -  test("no partial aggregation or sorting with kryo ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, true))
    -  }
    -
    -  test("no partial aggregation or sorting with java ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, false))
    -  }
    -
    -  def noPartialAggregationOrSorting(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    -  }
    -
    -  test("partial aggregation without spill with kryo ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, true))
    -  }
    -
    -  test("partial aggregation without spill with java ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, false))
    -  }
    -
    -  def partialAggregationWithoutSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +    cleanupIntermediateFilesInShuffle(withFailures = false)
       }
     
    -  test("partial aggregation with spill, no ordering with kryo ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, true))
    +  test("cleanup of intermediate files in shuffle with failures") {
    +    cleanupIntermediateFilesInShuffle(withFailures = true)
       }
     
    -  test("partial aggregation with spill, no ordering with java ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("no sorting or partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = false)
       }
     
    -  def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("no sorting or partial aggregation with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = true)
       }
     
    -  test("partial aggregation with spill, with ordering with kryo ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, true))
    +  testWithMultipleSer("sorting, no partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = false)
       }
     
    -
    -  test("partial aggregation with spill, with ordering with java ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("sorting, no partial aggregation with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = true)
       }
     
    -  def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
    -
    -    // avoid combine before spill
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)))
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("partial aggregation, no sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = false)
       }
     
    -  test("sorting without aggregation, no spill with kryo ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, true))
    +  testWithMultipleSer("partial aggregation, no sorting with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = true)
       }
     
    -  test("sorting without aggregation, no spill with java ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = false)
       }
     
    -  def sortingWithoutAggregationNoSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    -
    -  test("sorting without aggregation, with spill with kryo ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, true))
    -  }
    -
    -  test("sorting without aggregation, with spill with java ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = true)
       }
     
    -  def sortingWithoutAggregationWithSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    +  testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
    +    sortWithoutBreakingSortingContracts)
     
       test("spilling with hash collisions") {
    -    val conf = createSparkConf(true, false)
    +    val size = 1000
    +    val conf = createSparkConf(loadDefaults = true, kryo = false)
    +    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
         sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     
         def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
         def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
         def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String])
    -      : ArrayBuffer[String] = buffer1 ++= buffer2
    +    : ArrayBuffer[String] = buffer1 ++= buffer2
    --- End diff --
    
    oops I screwed that up


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148526820
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #9124: [SPARK-11078] Ensure spilling tests actually spill

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the issue:

    https://github.com/apache/spark/pull/9124
  
    I see, that's possible. The right thing to do here is to add a `wait` of some sort in the listener to block until we have received the stage completed event. We've done this in some other test listeners as well. (I can't remember which ones off the top of my head).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148245856
  
      [Test build #43747 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43747/console) for   PR 9124 at commit [`1b7fa3d`](https://github.com/apache/spark/commit/1b7fa3d6d32b1e254a47706db7eccd915c7368aa).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148221886
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148524964
  
    LGTM over all


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148223975
  
      [Test build #43747 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43747/consoleFull) for   PR 9124 at commit [`1b7fa3d`](https://github.com/apache/spark/commit/1b7fa3d6d32b1e254a47706db7eccd915c7368aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148526833
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148246056
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148528504
  
      [Test build #43812 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43812/consoleFull) for   PR 9124 at commit [`7590b77`](https://github.com/apache/spark/commit/7590b7747750a053f3253656d97dbeb7f38b8f80).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by andrewor14 <gi...@git.apache.org>.
Github user andrewor14 commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148531647
  
    OK, merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148221865
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/9124#discussion_r42182171
  
    --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala ---
    @@ -18,535 +18,91 @@
     package org.apache.spark.util.collection
     
     import scala.collection.mutable.ArrayBuffer
    -
     import scala.util.Random
     
     import org.apache.spark._
     import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
     
    -// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
     
     class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
    -  private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
    -    val conf = new SparkConf(loadDefaults)
    -    if (kryo) {
    -      conf.set("spark.serializer", classOf[KryoSerializer].getName)
    -    } else {
    -      // Make the Java serializer write a reset instruction (TC_RESET) after each object to test
    -      // for a bug we had with bytes written past the last object in a batch (SPARK-2792)
    -      conf.set("spark.serializer.objectStreamReset", "1")
    -      conf.set("spark.serializer", classOf[JavaSerializer].getName)
    -    }
    -    conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
    -    // Ensure that we actually have multiple batches per spill file
    -    conf.set("spark.shuffle.spill.batchSize", "10")
    -    conf.set("spark.testing.memory", "2000000")
    -    conf
    -  }
    -
    -  test("empty data stream with kryo ser") {
    -    emptyDataStream(createSparkConf(false, true))
    -  }
    -
    -  test("empty data stream with java ser") {
    -    emptyDataStream(createSparkConf(false, false))
    -  }
    -
    -  def emptyDataStream(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter.iterator.toSeq === Seq())
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(3)), None, None)
    -    assert(sorter2.iterator.toSeq === Seq())
    -    sorter2.stop()
    -
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    assert(sorter3.iterator.toSeq === Seq())
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), None, None)
    -    assert(sorter4.iterator.toSeq === Seq())
    -    sorter4.stop()
    -  }
    +  import TestUtils.{assertNotSpilled, assertSpilled}
     
    -  test("few elements per partition with kryo ser") {
    -    fewElementsPerPartition(createSparkConf(false, true))
    -  }
    +  testWithMultipleSer("empty data stream")(emptyDataStream)
     
    -  test("few elements per partition with java ser") {
    -    fewElementsPerPartition(createSparkConf(false, false))
    -  }
    +  testWithMultipleSer("few elements per partition")(fewElementsPerPartition)
     
    -  def fewElementsPerPartition(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Set((1, 1), (2, 2), (5, 5))
    -    val expected = Set(
    -      (0, Set()), (1, Set((1, 1))), (2, Set((2, 2))), (3, Set()), (4, Set()),
    -      (5, Set((5, 5))), (6, Set()))
    -
    -    // Both aggregator and ordering
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements.iterator)
    -    assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter.stop()
    -
    -    // Only aggregator
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      Some(agg), Some(new HashPartitioner(7)), None, None)
    -    sorter2.insertAll(elements.iterator)
    -    assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter2.stop()
    +  testWithMultipleSer("empty partitions with spilling")(emptyPartitionsWithSpilling)
     
    -    // Only ordering
    -    val sorter3 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter3.insertAll(elements.iterator)
    -    assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter3.stop()
    -
    -    // Neither aggregator nor ordering
    -    val sorter4 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), None, None)
    -    sorter4.insertAll(elements.iterator)
    -    assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected)
    -    sorter4.stop()
    -  }
    -
    -  test("empty partitions with spilling with kryo ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, true))
    +  // Load defaults, otherwise SPARK_HOME is not found
    +  testWithMultipleSer("spilling in local cluster", loadDefaults = true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 2)
       }
     
    -  test("empty partitions with spilling with java ser") {
    -    emptyPartitionsWithSpilling(createSparkConf(false, false))
    -  }
    -
    -  def emptyPartitionsWithSpilling(conf: SparkConf) {
    -    conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2))
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(7)), Some(ord), None)
    -    sorter.insertAll(elements)
    -    assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled
    -    val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList))
    -    assert(iter.next() === (0, Nil))
    -    assert(iter.next() === (1, List((1, 1))))
    -    assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList))
    -    assert(iter.next() === (3, Nil))
    -    assert(iter.next() === (4, Nil))
    -    assert(iter.next() === (5, List((5, 5))))
    -    assert(iter.next() === (6, Nil))
    -    sorter.stop()
    -  }
    -
    -  test("spilling in local cluster with kryo ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with java ser") {
    -    // Load defaults, otherwise SPARK_HOME is not found
    -    testSpillingInLocalCluster(createSparkConf(true, false))
    -  }
    -
    -  def testSpillingInLocalCluster(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~8 times
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~17 times
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey().collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~7 times
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~7 times
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~17 times
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with kryo ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, true))
    -  }
    -
    -  test("spilling in local cluster with many reduce tasks with java ser") {
    -    spillingInLocalClusterWithManyReduceTasks(createSparkConf(true, false))
    -  }
    -
    -  def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
    -
    -    // reduceByKey - should spill ~4 times per executor
    -    val rddA = sc.parallelize(0 until 100000).map(i => (i/2, i))
    -    val resultA = rddA.reduceByKey(math.max _, 100).collect()
    -    assert(resultA.length == 50000)
    -    resultA.foreach { case(k, v) =>
    -      if (v != k * 2 + 1) {
    -        fail(s"Value for ${k} was wrong: expected ${k * 2 + 1}, got ${v}")
    -      }
    -    }
    -
    -    // groupByKey - should spill ~8 times per executor
    -    val rddB = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultB = rddB.groupByKey(100).collect()
    -    assert(resultB.length == 25000)
    -    resultB.foreach { case(i, seq) =>
    -      val expected = Set(i * 4, i * 4 + 1, i * 4 + 2, i * 4 + 3)
    -      if (seq.toSet != expected) {
    -        fail(s"Value for ${i} was wrong: expected ${expected}, got ${seq.toSet}")
    -      }
    -    }
    -
    -    // cogroup - should spill ~4 times per executor
    -    val rddC1 = sc.parallelize(0 until 10000).map(i => (i, i))
    -    val rddC2 = sc.parallelize(0 until 10000).map(i => (i%1000, i))
    -    val resultC = rddC1.cogroup(rddC2, 100).collect()
    -    assert(resultC.length == 10000)
    -    resultC.foreach { case(i, (seq1, seq2)) =>
    -      i match {
    -        case 0 =>
    -          assert(seq1.toSet == Set[Int](0))
    -          assert(seq2.toSet == Set[Int](0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000))
    -        case 1 =>
    -          assert(seq1.toSet == Set[Int](1))
    -          assert(seq2.toSet == Set[Int](1, 1001, 2001, 3001, 4001, 5001, 6001, 7001, 8001, 9001))
    -        case 5000 =>
    -          assert(seq1.toSet == Set[Int](5000))
    -          assert(seq2.toSet == Set[Int]())
    -        case 9999 =>
    -          assert(seq1.toSet == Set[Int](9999))
    -          assert(seq2.toSet == Set[Int]())
    -        case _ =>
    -      }
    -    }
    -
    -    // larger cogroup - should spill ~4 times per executor
    -    val rddD1 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val rddD2 = sc.parallelize(0 until 10000).map(i => (i/2, i))
    -    val resultD = rddD1.cogroup(rddD2).collect()
    -    assert(resultD.length == 5000)
    -    resultD.foreach { case(i, (seq1, seq2)) =>
    -      val expected = Set(i * 2, i * 2 + 1)
    -      if (seq1.toSet != expected) {
    -        fail(s"Value 1 for ${i} was wrong: expected ${expected}, got ${seq1.toSet}")
    -      }
    -      if (seq2.toSet != expected) {
    -        fail(s"Value 2 for ${i} was wrong: expected ${expected}, got ${seq2.toSet}")
    -      }
    -    }
    -
    -    // sortByKey - should spill ~8 times per executor
    -    val rddE = sc.parallelize(0 until 100000).map(i => (i/4, i))
    -    val resultE = rddE.sortByKey().collect().toSeq
    -    assert(resultE === (0 until 100000).map(i => (i/4, i)).toSeq)
    +  testWithMultipleSer("spilling in local cluster with many reduce tasks", loadDefaults = true) {
    +    (conf: SparkConf) => testSpillingInLocalCluster(conf, 100)
       }
     
       test("cleanup of intermediate files in sorter") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    -
    -    val sorter2 = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter2.insertAll((0 until 120000).iterator.map(i => (i, i)))
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    assert(sorter2.iterator.toSet === (0 until 120000).map(i => (i, i)).toSet)
    -    sorter2.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +    cleanupIntermediateFilesInSorter(withFailures = false)
       }
     
    -  test("cleanup of intermediate files in sorter if there are errors") {
    -    val conf = createSparkConf(true, false)  // Load defaults, otherwise SPARK_HOME is not found
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val ord = implicitly[Ordering[Int]]
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    intercept[SparkException] {
    -      sorter.insertAll((0 until 120000).iterator.map(i => {
    -        if (i == 119990) {
    -          throw new SparkException("Intentional failure")
    -        }
    -        (i, i)
    -      }))
    -    }
    -    assert(diskBlockManager.getAllFiles().length > 0)
    -    sorter.stop()
    -    assert(diskBlockManager.getAllBlocks().length === 0)
    +  test("cleanup of intermediate files in sorter with failures") {
    +    cleanupIntermediateFilesInSorter(withFailures = true)
       }
     
       test("cleanup of intermediate files in shuffle") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => (i, i))
    -    assert(data.reduceByKey(_ + _).count() === 100000)
    -
    -    // After the shuffle, there should be only 4 files on disk: our two map output files and
    -    // their index files. All other intermediate files should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 4)
    -  }
    -
    -  test("cleanup of intermediate files in shuffle with errors") {
    -    val conf = createSparkConf(false, false)
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -    val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
    -
    -    val data = sc.parallelize(0 until 100000, 2).map(i => {
    -      if (i == 99990) {
    -        throw new Exception("Intentional failure")
    -      }
    -      (i, i)
    -    })
    -    intercept[SparkException] {
    -      data.reduceByKey(_ + _).count()
    -    }
    -
    -    // After the shuffle, there should be only 2 files on disk: the output of task 1 and its index.
    -    // All other files (map 2's output and intermediate merge files) should've been deleted.
    -    assert(diskBlockManager.getAllFiles().length === 2)
    -  }
    -
    -  test("no partial aggregation or sorting with kryo ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, true))
    -  }
    -
    -  test("no partial aggregation or sorting with java ser") {
    -    noPartialAggregationOrSorting(createSparkConf(false, false))
    -  }
    -
    -  def noPartialAggregationOrSorting(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    -  }
    -
    -  test("partial aggregation without spill with kryo ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, true))
    -  }
    -
    -  test("partial aggregation without spill with java ser") {
    -    partialAggregationWithoutSpill(createSparkConf(false, false))
    -  }
    -
    -  def partialAggregationWithoutSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +    cleanupIntermediateFilesInShuffle(withFailures = false)
       }
     
    -  test("partial aggregation with spill, no ordering with kryo ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, true))
    +  test("cleanup of intermediate files in shuffle with failures") {
    +    cleanupIntermediateFilesInShuffle(withFailures = true)
       }
     
    -  test("partial aggregation with spill, no ordering with java ser") {
    -    partialAggregationWIthSpillNoOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("no sorting or partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = false)
       }
     
    -  def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("no sorting or partial aggregation with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = false, withSpilling = true)
       }
     
    -  test("partial aggregation with spill, with ordering with kryo ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, true))
    +  testWithMultipleSer("sorting, no partial aggregation") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = false)
       }
     
    -
    -  test("partial aggregation with spill, with ordering with java ser") {
    -    partialAggregationWithSpillWithOrdering(createSparkConf(false, false))
    +  testWithMultipleSer("sorting, no partial aggregation with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = false, withOrdering = true, withSpilling = true)
       }
     
    -  def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j)
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None)
    -
    -    // avoid combine before spill
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i , 2 * i)))
    -    sorter.insertAll((0 until 50000).iterator.map(i => (i, 2 * i + 1)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet)
    -    }).toSet
    -    assert(results === expected)
    +  testWithMultipleSer("partial aggregation, no sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = false)
       }
     
    -  test("sorting without aggregation, no spill with kryo ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, true))
    +  testWithMultipleSer("partial aggregation, no sorting with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = false, withSpilling = true)
       }
     
    -  test("sorting without aggregation, no spill with java ser") {
    -    sortingWithoutAggregationNoSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = false)
       }
     
    -  def sortingWithoutAggregationNoSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    -
    -  test("sorting without aggregation, with spill with kryo ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, true))
    -  }
    -
    -  test("sorting without aggregation, with spill with java ser") {
    -    sortingWithoutAggregationWithSpill(createSparkConf(false, false))
    +  testWithMultipleSer("partial aggregation and sorting with spilling") { (conf: SparkConf) =>
    +    basicSorterTest(conf, withPartialAgg = true, withOrdering = true, withSpilling = true)
       }
     
    -  def sortingWithoutAggregationWithSpill(conf: SparkConf) {
    -    conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
    -    sc = new SparkContext("local", "test", conf)
    -
    -    val ord = implicitly[Ordering[Int]]
    -    val sorter = new ExternalSorter[Int, Int, Int](
    -      None, Some(new HashPartitioner(3)), Some(ord), None)
    -    sorter.insertAll((0 until 100000).iterator.map(i => (i, i)))
    -    val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq
    -    val expected = (0 until 3).map(p => {
    -      (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq)
    -    }).toSeq
    -    assert(results === expected)
    -  }
    +  testWithMultipleSer("sort without breaking sorting contracts", loadDefaults = true)(
    +    sortWithoutBreakingSortingContracts)
     
       test("spilling with hash collisions") {
    -    val conf = createSparkConf(true, false)
    +    val size = 1000
    +    val conf = createSparkConf(loadDefaults = true, kryo = false)
    +    conf.set("spark.shuffle.spill.numElementsForceSpillThreshold", (size / 2).toString)
         sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
     
         def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
         def mergeValue(buffer: ArrayBuffer[String], i: String): ArrayBuffer[String] = buffer += i
         def mergeCombiners(buffer1: ArrayBuffer[String], buffer2: ArrayBuffer[String])
    -      : ArrayBuffer[String] = buffer1 ++= buffer2
    +    : ArrayBuffer[String] = buffer1 ++= buffer2
    --- End diff --
    
    nit:
    ```
    def mergeCombiners(
        buffer1: ArrayBuffer[String],
        buffer2: ArrayBuffer[String]): ArrayBuffer[String]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148246061
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43747/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148562387
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43812/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/9124


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148562203
  
      [Test build #43812 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/43812/console) for   PR 9124 at commit [`7590b77`](https://github.com/apache/spark/commit/7590b7747750a053f3253656d97dbeb7f38b8f80).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-11078] Ensure spilling tests actually s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/9124#issuecomment-148263547
  
      [Test build #1904 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1904/consoleFull) for   PR 9124 at commit [`1b7fa3d`](https://github.com/apache/spark/commit/1b7fa3d6d32b1e254a47706db7eccd915c7368aa).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org