You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by advancedxy <gi...@git.apache.org> on 2018/01/31 06:31:45 UTC

[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

GitHub user advancedxy opened a pull request:

    https://github.com/apache/spark/pull/20449

    [SPARK-23040][CORE]: Returns interruptible iterator for shuffle reader

    ## What changes were proposed in this pull request?
    
    Before this commit, a non-interruptible iterator is returned if aggregator or ordering is specified.
    This commit also ensures that sorter is closed even when task is cancelled(killed) in the middle of sorting.
    
    ## How was this patch tested?
    
    Add a unit test in JobCancellationSuite

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/advancedxy/spark SPARK-23040

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20449.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20449
    
----
commit acca0e3da2da21e4e184f01b4e8f7b6b8c05ee1d
Author: Xianjin YE <ad...@...>
Date:   2018-01-31T06:27:21Z

    [SPARK-23040][CORE]: Returns interruptible iterator for shuffle reader
    
    Before this commit, a non-interruptible iterator is returned if
    aggregator or ordering is specified.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87725 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87725/testReport)** for PR 20449 at commit [`8c15c56`](https://github.com/apache/spark/commit/8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r165051362
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in
    +          // CompletionIterator. Another way would be making sorter.stop idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    seems we can remove this `if` if we don't return a `CompletionIterator`.
    
    BTW I think we need to check all the places that use `CompletionIterator`, to see if they consider job canceling.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    thanks, merging to master!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r166877282
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,41 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    import JobCancellationSuite._
    +    sc = new SparkContext("local[2]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, 2).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        // Small delay to ensure that foreach is cancelled if task is killed
    +        Thread.sleep(1000)
    --- End diff --
    
    +1


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87703 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87703/testReport)** for PR 20449 at commit [`ba2f355`](https://github.com/apache/spark/commit/ba2f355dca21f1baa7cad82199402dcec1798584).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87813 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87813/testReport)** for PR 20449 at commit [`756e0b7`](https://github.com/apache/spark/commit/756e0b7336fff3c72eca70c2ab489600211b9253).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20449


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171486224
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       override def afterEach() {
         try {
           resetSparkContext()
    +      // Reset semaphores if used by multiple tests.
    +      // Note: if other semaphores are shared by multiple tests, please reset them in this block
    +      JobCancellationSuite.taskStartedSemaphore.drainPermits()
    +      JobCancellationSuite.taskCancelledSemaphore.drainPermits()
    --- End diff --
    
    or we can make all semaphores local, so that we don't need to care about it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    @jerryshao @cloud-fan I have updated my code. Do you have any other concerns?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170820691
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    --- End diff --
    
    This will be called twice as the root RDD has 2 partitions, so `f.cancel` might be called before both of these 2 partitions finished.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171489737
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       override def afterEach() {
         try {
           resetSparkContext()
    +      // Reset semaphores if used by multiple tests.
    +      // Note: if other semaphores are shared by multiple tests, please reset them in this block
    +      JobCancellationSuite.taskStartedSemaphore.drainPermits()
    +      JobCancellationSuite.taskCancelledSemaphore.drainPermits()
    --- End diff --
    
    >for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.
    
    Another way to avoid this problem is: don't reuse semaphores. But that's too verbose.
    
    As for your suggestion, if new semaphores are added by others, how could he know that he's supposed to reset the semaphores?  Maybe some comments are needed in semaphore declaration
    
    >or we can make all semaphores local, so that we don't need to care about it.
    
    No, Global semaphore is required when being shared between driver and executor(another thread in local mode).
    See related pr https://github.com/apache/spark/pull/4180 for details



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87725 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87725/testReport)** for PR 20449 at commit [`8c15c56`](https://github.com/apache/spark/commit/8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170847978
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    +    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
    +
    +    // Make sure tasks are indeed completed.
    +    taskCompletedSem.acquire(numSlice)
    +    assert(executionOfInterruptibleCounter.get() <= 11)
    --- End diff --
    
    For a unit test, I'd like to have the simplest test that can expose the bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    ok to test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170501479
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -18,15 +18,14 @@
     package org.apache.spark
     
     import java.util.concurrent.Semaphore
    +import java.util.concurrent.atomic.AtomicInteger
     
     import scala.concurrent.ExecutionContext.Implicits.global
     import scala.concurrent.Future
     import scala.concurrent.duration._
    -
     import org.scalatest.BeforeAndAfter
     import org.scalatest.Matchers
    -
    --- End diff --
    
    this will break the style check


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87845 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87845/testReport)** for PR 20449 at commit [`28119e9`](https://github.com/apache/spark/commit/28119e9e191e0c2ec2acd2a12643e6bc00f5cca4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r172416019
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was finished/cancelled.
    +        context.addTaskCompletionListener(_ => {
    +          sorter.stop()
    +        })
             CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
           case None =>
             aggregatedIter
         }
    +    // Use another interruptible iterator here to support task cancellation as aggregator or(and)
    +    // sorter may have consumed previous interruptible iterator.
    +    new InterruptibleIterator[Product2[K, C]](context, resultIter)
    --- End diff --
    
    Will do 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171153715
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 1
    --- End diff --
    
    I'm not sure, let's just try it :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87822/testReport)** for PR 20449 at commit [`a3d8ad5`](https://github.com/apache/spark/commit/a3d8ad56f0709c343e508c8b636083243f9ffdd2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87701 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87701/testReport)** for PR 20449 at commit [`88e86e0`](https://github.com/apache/spark/commit/88e86e0ef2fc069cb0c6531979b9ae713bc88c90).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    cc @jiangxb1987 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r172414393
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was finished/cancelled.
    +        context.addTaskCompletionListener(_ => {
    +          sorter.stop()
    +        })
             CompletionIterator[Product2[K, C], Iterator[Product2[K, C]]](sorter.iterator, sorter.stop())
           case None =>
             aggregatedIter
         }
    +    // Use another interruptible iterator here to support task cancellation as aggregator or(and)
    +    // sorter may have consumed previous interruptible iterator.
    +    new InterruptibleIterator[Product2[K, C]](context, resultIter)
    --- End diff --
    
    there is a chance that `resultIter` is already an `InterruptibleIterator`, and we should not double wrap it. Can you send a followup PR to fix this? then we can backport them to 2.3 together.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    @advancedxy did you see any issue or exception regarding to this issue?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87703 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87703/testReport)** for PR 20449 at commit [`ba2f355`](https://github.com/apache/spark/commit/ba2f355dca21f1baa7cad82199402dcec1798584).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    @cloud-fan I have update the comments and fixed style issues(previously was auto formatted by IntelliJ)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170501977
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    --- End diff --
    
    can we briefly explain what happened in this test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87822 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87822/testReport)** for PR 20449 at commit [`a3d8ad5`](https://github.com/apache/spark/commit/a3d8ad56f0709c343e508c8b636083243f9ffdd2).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170842569
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    --- End diff --
    
    nit: `intercept[SparkException](f.get()).getCause`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87703/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171152501
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 1
    --- End diff --
    
    Will update it later. 
    
    But looks like Jenkins are having troubles there days? it it back to normal?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r166875113
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,41 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    import JobCancellationSuite._
    +    sc = new SparkContext("local[2]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, 2).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        // Small delay to ensure that foreach is cancelled if task is killed
    +        Thread.sleep(1000)
    --- End diff --
    
    I think using `sleep` will make the UT flaky, I think you should change to some deterministic ways.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87756/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87723 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87723/testReport)** for PR 20449 at commit [`d6ed9a1`](https://github.com/apache/spark/commit/d6ed9a15e24c414251ffaf09839eb5b3c0567d75).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87845 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87845/testReport)** for PR 20449 at commit [`28119e9`](https://github.com/apache/spark/commit/28119e9e191e0c2ec2acd2a12643e6bc00f5cca4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171485997
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       override def afterEach() {
         try {
           resetSparkContext()
    +      // Reset semaphores if used by multiple tests.
    +      // Note: if other semaphores are shared by multiple tests, please reset them in this block
    +      JobCancellationSuite.taskStartedSemaphore.drainPermits()
    +      JobCancellationSuite.taskCancelledSemaphore.drainPermits()
    --- End diff --
    
    nit: for simplicity, I'd like to reset all semaphores here, instead of thinking about which one are shared.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170843555
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    +    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
    +
    +    // Make sure tasks are indeed completed.
    +    taskCompletedSem.acquire(numSlice)
    +    assert(executionOfInterruptibleCounter.get() <= 11)
    --- End diff --
    
    why it's 11 not 10?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170845905
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    +    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
    +
    +    // Make sure tasks are indeed completed.
    +    taskCompletedSem.acquire(numSlice)
    +    assert(executionOfInterruptibleCounter.get() <= 11)
    --- End diff --
    
    For simplicity, can we just test 1 partition/task?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171148057
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 1
    --- End diff --
    
    can we hardcode it? using a variable makes people feel like they can change its value and the test can still pass, however it's not true as `assert(executionOfInterruptibleCounter.get() <= 10)` needs to be updated too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87813/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170847138
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    +    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
    +
    +    // Make sure tasks are indeed completed.
    +    taskCompletedSem.acquire(numSlice)
    +    assert(executionOfInterruptibleCounter.get() <= 11)
    --- End diff --
    
    Of course, we can. I choose 2 because Spark job normally has multiple partitions.
    
    Then it's your call to go with single partition or stick with this one.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170457215
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was completed(either finished/cancelled).
    --- End diff --
    
    To fit the 100 chars limitation, `or` is replaced by `/`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170844554
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    --- End diff --
    
    will do


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87723 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87723/testReport)** for PR 20449 at commit [`d6ed9a1`](https://github.com/apache/spark/commit/d6ed9a15e24c414251ffaf09839eb5b3c0567d75).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87723/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    I understood your intention. I was wondering do we actually meet this issue in production envs, or do you have a minimal reproduce code?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170842725
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    --- End diff --
    
    We should add some comment to explain when we reach here. From what I am seeing:
    1. `taskStartedSemaphore.release()` must be called, so task is started.
    2. the task has processed no more than 10 records, because the reduce stage is not finished and `taskCancelledSemaphore.acquire()` will be blocked.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170844335
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    --- End diff --
    
    Yes


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r171519778
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -40,6 +41,10 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
       override def afterEach() {
         try {
           resetSparkContext()
    +      // Reset semaphores if used by multiple tests.
    +      // Note: if other semaphores are shared by multiple tests, please reset them in this block
    +      JobCancellationSuite.taskStartedSemaphore.drainPermits()
    +      JobCancellationSuite.taskCancelledSemaphore.drainPermits()
    --- End diff --
    
    > Maybe some comments are needed in semaphore declaration
    
    +1. It's also good for reviewers, otherwise figuring out a semaphore is shared or not is really unnecessary for reviewers.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170844540
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    --- End diff --
    
    will do


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87701 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87701/testReport)** for PR 20449 at commit [`88e86e0`](https://github.com/apache/spark/commit/88e86e0ef2fc069cb0c6531979b9ae713bc88c90).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    > I'm not sure, let's just try it :)
    
    All right, I finally tracked down why it's hanging on Jenkins.
    The global semaphores used by `interruptible iterator of shuffle reader` are interfered by other tasks.
    
    Please check the latest change, @cloud-fan 



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by jiangxb1987 <gi...@git.apache.org>.
Github user jiangxb1987 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r166857525
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in
    +          // CompletionIterator. Another way would be making sorter.stop idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    I may be missing something obvious, but seems `ExternalSorter.stop()` is already idempotent?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    > I was wondering do we actually meet this issue in production envs,
    
    @jerryshao I met this issue in our production when I was debugging a Spark job. I noticed the aborted stage's task continues running until finishes.  
    
    I cannot give a minimal reproduce code since the failure is related to our mixed(online and offline services) hosts. But you can have a look at the test case I added, it essentially captures the transformation I used except the async part.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Hi, @jerryshao I didn't see exception. But the issue is:
    When the stage is abort and all the remaining tasks are killed, those tasks are not cancelled but rather continue running which is a waste of executor resource.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    ping @cloud-fan and @jiangxb1987.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r165066153
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in
    +          // CompletionIterator. Another way would be making sorter.stop idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    One advantage of `CompletionIterator` is that the `completionFunction` will be called as soon as the wrapped iterator is consumed. So for sorter, it will release memory earlier rather than at task completion.
    
    As for job cancelling, It's not just `CompletionIterator` that we should consider. The combiner and sorter pattern(or similar) is something we should look for:
    ``` scala
    combiner.insertAll(iterator) // or sorter.insertAll(iterator)
    // then returns new iterator
    combiner.iterator // or sorter.iterator
    ```



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170844986
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    +    f.cancel()
    +
    +    val e = intercept[SparkException] { f.get() }.getCause
    +    assert(e.getMessage.contains("cancelled") || e.getMessage.contains("killed"))
    +
    +    // Make sure tasks are indeed completed.
    +    taskCompletedSem.acquire(numSlice)
    +    assert(executionOfInterruptibleCounter.get() <= 11)
    --- End diff --
    
    See my comments: https://github.com/apache/spark/pull/20449/files#diff-f870525c548cfe030ff3a2e489b33c96R341


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170821804
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    --- End diff --
    
    `f.cancel()` should be called before these partitions(tasks) finishing , and we want to make sure these tasks could be cancelled


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87756 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87756/testReport)** for PR 20449 at commit [`8c15c56`](https://github.com/apache/spark/commit/8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r166864792
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,18 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    +        context.addTaskCompletionListener(tc => {
    +          // Note: we only stop sorter if cancelled as sorter.stop wouldn't be called in
    +          // CompletionIterator. Another way would be making sorter.stop idempotent.
    +          if (tc.isInterrupted()) { sorter.stop() }
    --- End diff --
    
    > I may be missing something obvious, but seems ExternalSorter.stop() is already idempotent?
    
    Ah, yes. After another look, it's indeed idempotent. 
    Will update the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87701/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    @cloud-fan is it possible that we also merge this into branch-2.3, so this fix could be released in the Spark-2.3.1?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170844519
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    --- End diff --
    
    As soon as one task starts, we can cancel the job.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by jerryshao <gi...@git.apache.org>.
Github user jerryshao commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    I see. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r167158719
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was cancelled.
    --- End diff --
    
    `if task is completed(either finished or canceled)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87845/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170843830
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    --- End diff --
    
    do you mean without your PR, the task will keep running and hit this line 1000 times after canceling?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170501431
  
    --- Diff: core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala ---
    @@ -104,9 +104,16 @@ private[spark] class BlockStoreShuffleReader[K, C](
             context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled)
             context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled)
             context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes)
    +        // Use completion callback to stop sorter if task was completed(either finished/cancelled).
    --- End diff --
    
    then we can just write `if task was finished/cancelled.`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170843441
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,58 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val taskCompletedSem = new Semaphore(0)
    +
    +    sc.addSparkListener(new SparkListener {
    +      override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
    +        // release taskCancelledSemaphore when cancelTasks event has been posted
    +        if (stageCompleted.stageInfo.stageId == 1) {
    +          taskCancelledSemaphore.release(1000)
    +        }
    +      }
    +
    +      override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
    +        if (taskEnd.stageId == 1) { // make sure tasks are completed
    +          taskCompletedSem.release()
    +        }
    +      }
    +    })
    +
    +    taskStartedSemaphore.acquire()
    --- End diff --
    
    why not `taskStartedSemaphore.acquire(numSlice)`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170822587
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val sem = new Semaphore(0)
    +    val taskCompletedSem = new Semaphore(0)
    +    Future {
    +      taskStartedSemaphore.acquire()
    +      f.cancel()
    --- End diff --
    
    Line 372: `sem.acquire()` is blocked by this `Future block`, but it looks we don't need `Future` or `sem` here. I will update the code.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87756 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87756/testReport)** for PR 20449 at commit [`8c15c56`](https://github.com/apache/spark/commit/8c15c564c7d2d0adc0cfd725e34dbd359c6a0ab6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87822/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170501590
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +319,55 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if ( x._1 >= 10) { // this block of code is partially executed.
    --- End diff --
    
    no space after `if(`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    **[Test build #87813 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87813/testReport)** for PR 20449 at commit [`756e0b7`](https://github.com/apache/spark/commit/756e0b7336fff3c72eca70c2ab489600211b9253).
     * This patch **fails from timeout after a configured wait of \`300m\`**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by advancedxy <gi...@git.apache.org>.
Github user advancedxy commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    ping @cloud-fan 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20449: [SPARK-23040][CORE]: Returns interruptible iterator for ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20449
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87725/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20449: [SPARK-23040][CORE]: Returns interruptible iterat...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20449#discussion_r170821083
  
    --- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala ---
    @@ -320,6 +321,63 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft
         f2.get()
       }
     
    +  test("Interruptible iterator of shuffle reader") {
    +    // In this test case, we create a Spark job of two stages. The second stage is cancelled during
    +    // execution and a counter is used to make sure that the corresponding tasks are indeed
    +    // cancelled.
    +    import JobCancellationSuite._
    +    val numSlice = 2
    +    sc = new SparkContext(s"local[$numSlice]", "test")
    +
    +    val f = sc.parallelize(1 to 1000, numSlice).map { i => (i, i) }
    +      .repartitionAndSortWithinPartitions(new HashPartitioner(2))
    +      .mapPartitions { iter =>
    +        taskStartedSemaphore.release()
    +        iter
    +      }.foreachAsync { x =>
    +        if (x._1 >= 10) {
    +          // This block of code is partially executed. It will be blocked when x._1 >= 10 and the
    +          // next iteration will be cancelled if the source iterator is interruptible. Then in this
    +          // case, the maximum num of increment would be 11(|1...10| + |N|) where N is the first
    +          // element in another partition(assuming no ordering guarantee).
    +          taskCancelledSemaphore.acquire()
    +        }
    +        executionOfInterruptibleCounter.getAndIncrement()
    +    }
    +
    +    val sem = new Semaphore(0)
    +    val taskCompletedSem = new Semaphore(0)
    +    Future {
    +      taskStartedSemaphore.acquire()
    +      f.cancel()
    --- End diff --
    
    what's the expectation for when this `f.cancel()` should be called?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org