You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by jinxing64 <gi...@git.apache.org> on 2017/01/17 17:02:28 UTC

[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should handle stage's ...

GitHub user jinxing64 opened a pull request:

    https://github.com/apache/spark/pull/16620

    [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion.

    ## What changes were proposed in this pull request?
    In current `DAGScheduler handleTaskCompletion` code, when event.reason is `Success`, it will first do `stage.pendingPartitions -= task.partitionId`, which maybe a bug when `FetchFailed` happens. Think about below:
    1. There are 2 executors A and B, executorA got assigned with ShuffleMapTask1 and ShuffleMapTask2;
    2. ShuffleMapTask1 want's to fetch blocks from local but failed;
    3. Driver receives the `FetchFailed` caused by ShuffleMapTask1 on executorA and marks executorA as lost and updates `failedEpoch`;
    4. Driver resubmits stages, containing ShuffleMapTask1x and ShuffleMapTask2x;
    5. ShuffleMapTask2 is successfully finished on executorA and sends `Success` back to driver;
    6. Driver receives `Success` and do `stage.pendingPartitions -= task.partitionId`, but then driver finds task's epoch is not big enough `<= failedEpoch(execId)` and just takes it as bogus, does not add the `MapStatus` to stage;
    7. ShuffleMapTask1x is successfully finished on executorB;
    8. Driver receives `Success` from ShuffleMapTask1x on executorB and does `stage.pendingPartitions -= task.partitionId`, thus no pending partitions, but then finds not all partitions are available because of step 6;
    9. Driver resubmits stage; but at this moment ShuffleMapTask2x is still running; in `TaskSchedulerImpl` submitTasks, it finds `conflictingTaskSet`, then throw `IllegalStateException`
    10. Failed.
    To reproduce the bug:
    1. We need to do some modification in `ShuffleBlockFetcherIterator`: we check whether the task's index in `TaskSetManager` and stage attempt equal to 0 at the same time, if so, throw `FetchFailedException`;
    2. Rebuild spark then submit following job:
    ```
        val rdd = sc.parallelize(List((0, 1), (1, 1), (2, 1), (3, 1), (1, 2), (0, 3), (2, 1), (3, 1)), 2)
        rdd.reduceByKey {
          (v1, v2) => {
            Thread.sleep(10000)
            v1 + v2
          }
        }.map {
          keyAndValue => {
            (keyAndValue._1 % 2, keyAndValue._2)
          }
        }.reduceByKey {
          (v1, v2) => {
            Thread.sleep(10000)
            v1 + v2
    
          }
        }.collect
    ```
    ## How was this patch tested?
    This patch is manually tested, after this patch, the bug cannot be reproduced in above situation;
    It is hard to write a unit test. Because we have to mock the behavior in both `DAGScheduler` and `TaskScheduler`.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jinxing64/spark SPARK-19263

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/16620.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #16620
    
----
commit 9e4aab2addf2c8ed5e208938532f2fcbaf3547c0
Author: jinxing <ji...@meituan.com>
Date:   2017-01-17T16:49:02Z

    [SPARK-19263] DAGScheduler should handle stage's pendingPartitions properly in handleTaskCompletion.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #71979 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71979/testReport)** for PR 16620 at commit [`3f0ebb8`](https://github.com/apache/spark/commit/3f0ebb826985a35a90584383ccd605221d0f9c43).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    LGTM pending one last comment improvement


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98043010
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -718,6 +703,21 @@ private[spark] class TaskSetManager(
             " because task " + index + " has already completed successfully")
         }
         maybeFinishTaskSet()
    +    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
    +    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
    +    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
    +    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
    +    // Note: "result.value()" only deserializes the value when it's called at the first time, so
    +    // here "result.value()" just returns the value and won't block other threads.
    +    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
    +    // Kill any other attempts for the same task (since those are unnecessary now that one
    +    // attempt completed successfully).
    +    for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
    +      logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
    +        s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
    +        s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
    +      sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)
    +    }
    --- End diff --
    
    The `Success` is handled in `DAGScheduler` in a different thread. `DAGScheduler` perhaps needs to check `tasksetManager's` status, e.g. `isZombie`. Move the code here, thus the checking is safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Could you please take another look at this ? : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97140817
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1193,7 +1193,15 @@ class DAGScheduler(
                 }
     
                 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
    -              markStageAsFinished(shuffleStage)
    +              val activeTaskSetManagerExist =
    --- End diff --
    
    And since it is being used as `!activeTaskSetManagerExists`, you could reverse the sense, avoid needing the `!`, and call it something like `noActiveTaskSetManager`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97417399
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +648,69 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    +    val b = shuffle(2, a)
    +    val shuffleId = b.shuffleDeps.head.shuffleId
    +
    +    def runBackend(): Unit = {
    +      val (taskDescription, task) = backend.beginTask()
    +      task.stageId match {
    +        // ShuffleMapTask
    +        case 0 =>
    +          val stageAttempt = task.stageAttemptId
    +          val partitionId = task.partitionId
    +          (stageAttempt, partitionId) match {
    +            case (0, 0) =>
    +              val fetchFailed = FetchFailed(
    +                DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
    +              backend.taskFailed(taskDescription, fetchFailed)
    +            case (0, 1) =>
    +              // Wait until stage resubmission caused by FetchFailed is finished.
    +              waitUntilConditionBecomeTrue(taskScheduler.runningTaskSets.size==2, 5000,
    +                "Wait until stage is resubmitted caused by fetch failed")
    +
    +              // Task(stageAttempt=0, partition=1) will be bogus, because both two
    +              // tasks(stageAttempt=0, partition=0, 1) run on hostA.
    +              // Pending partitions are (0, 1) after stage resubmission,
    +              // then change to be 0 after this bogus task.
    +              backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2))
    +            case (1, 1) =>
    +              // Wait long enough until Success of task(stageAttempt=1 and partition=0)
    +              // is handled by DAGScheduler.
    +              Thread.sleep(5000)
    --- End diff --
    
    hmm, this is a nuisance.  I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in `DAGSchedulerSuite`?  it seems like this can be entirely contained to the `DAGScheduler` and doesn't require tricky interactions with other parts of the scheduler.  (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of `DAGSchedlerSuite` but there was some reason you couldn't.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72376 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72376/testReport)** for PR 16620 at commit [`56aa1ca`](https://github.com/apache/spark/commit/56aa1ca8a3eb9583b003e783434655491368a178).
     * This patch **fails Scala style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72376 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72376/testReport)** for PR 16620 at commit [`56aa1ca`](https://github.com/apache/spark/commit/56aa1ca8a3eb9583b003e783434655491368a178).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72377 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72377/testReport)** for PR 16620 at commit [`e7cbea0`](https://github.com/apache/spark/commit/e7cbea014783a4582c5cfdb40059a0f61910e9c8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72498/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #3540 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3540/testReport)** for PR 16620 at commit [`9e4aab2`](https://github.com/apache/spark/commit/9e4aab2addf2c8ed5e208938532f2fcbaf3547c0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72227 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72227/testReport)** for PR 16620 at commit [`76961c3`](https://github.com/apache/spark/commit/76961c3ba64e19c43ebfc0b18651d68c54949edb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99615906
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,96 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") {
    +    val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) {
    --- End diff --
    
    nit: I wouldn't call this a "mock", its pretty much the full taskScheduler with a tiny addition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72023 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72023/testReport)** for PR 16620 at commit [`be7e701`](https://github.com/apache/spark/commit/be7e701d0f01081eec5e25e2256eb45560fbe97b).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Also, if you implement the new change I proposed, I think it's relatively straightforward to write a new test in DAGSchedulerSuite for the new behavior (which will be pretty similar to the test I modified in #16892).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Fail to pass unit test. I will keep working on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    BTW Mark one slightly different version of your suggestion I'd considered is:
    
    (1) move stage.pendingPartitions -= task.partitionId so that it's duplicated in each of the two case statements below
    
    (2) for the ResultTask case, removing the partition can happen right at the beginning
    
    (3) for the ShuffleMapTask case, removing the partition can happen in the else statement on line 1196, where addOutputLoc is called.
    
    One benefit of that approach is that it makes it a little more obvious which state is related: that the pendingPartitions should mirror (in reverse) the output locations for the state.  It also consolidates the logic for handling previously failed executors into the one location.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98703486
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/Stage.scala ---
    @@ -68,6 +68,12 @@ private[scheduler] abstract class Stage(
       /** Set of jobs that this stage belongs to. */
       val jobIds = new HashSet[Int]
     
    +  /**
    +   * Partitions which there is not yet a task succeeded on. Note that for [[ShuffleMapStage]]
    +   * pendingPartitions.size() == 0 doesn't mean the stage is available. Because the succeeded
    +   * task can be bogus which is out of date and task's epoch is older than corresponding
    +   * executor's failed epoch in [[DAGScheduler]].
    +   */
    --- End diff --
    
    How about:
    
    Partitions the DAGScheduler is waiting on before it tries to mark the stage / job as completed and continue.  Most commonly, this is the set of tasks that are not successful in the active taskset for this stage, but not always.  In particular, when there are multiple attempts for a stage, then this will include late task completions from earlier attempts.  Finally, note that when this is empty, it does not *necessarily* mean that stage is completed -- we have may have lost some of the map output from that stage.  But the DAGScheduler will check for this condition and resubmit the stage if necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Thanks for the work thus far, @jinxing64 , but this really needs updated test coverage before we can consider merging it.
    
    @squito 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100922127
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    --- End diff --
    
    can you comment this like I suggested in your other PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99615666
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,96 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") {
    +    val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) {
    +      override def submitTasks(taskSet: TaskSet): Unit = {
    +        super.submitTasks(taskSet)
    +        taskSets += taskSet
    +      }
    +    }
    +    val mockDAGScheduler = new DAGScheduler(
    +      sc,
    +      mockTaskSchedulerImpl,
    +      sc.listenerBus,
    +      mapOutputTracker,
    +      blockManagerMaster,
    +      sc.env
    +    ) {
    +      override def taskEnded(
    +                              task: Task[_],
    +                              reason: TaskEndReason,
    +                              result: Any,
    +                              accumUpdates: Seq[AccumulatorV2[_, _]],
    +                              taskInfo: TaskInfo): Unit = {
    +        dagEventProcessLoopTester.post(
    +          CompletionEvent(task, reason, result, accumUpdates, taskInfo))
    +      }
    +    }
    +
    +    val mockSchedulerBackend = new SchedulerBackend {
    +      override def stop(): Unit = {}
    +
    +      override def defaultParallelism(): Int = 2
    +
    +      override def reviveOffers(): Unit = {}
    +
    +      override def start(): Unit = {}
    +    }
    +
    +    def getTaskSetManagerByTask(task: Task[_]): TaskSetManager = {
    +      val taskSetManagerOpt = mockTaskSchedulerImpl
    +        .taskSetManagerForAttempt(task.stageId, task.stageAttemptId)
    +      assert(taskSetManagerOpt.isDefined)
    +      taskSetManagerOpt.get
    +    }
    +
    +    def resourceOffer(taskSetManager: TaskSetManager, host: String, execId: String): Unit = {
    +      taskSetManager.resourceOffer(execId, host, TaskLocality.ANY)
    +    }
    +
    +    def taskSuccessful(tsm: TaskSetManager, task: Task[_], result: Any): Unit = {
    +      val taskIdOpt = tsm.taskInfos.find(_._2.index == task.partitionId)
    --- End diff --
    
    this would be a little cleaner if it didn't need the `tsm` passed in, instead inside it did `val tsm = getTaskSetManagerByTask(task)`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r101392190
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1181,15 +1181,34 @@ class DAGScheduler(
     
               case smt: ShuffleMapTask =>
                 val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
    -            shuffleStage.pendingPartitions -= task.partitionId
                 updateAccumulators(event)
                 val status = event.result.asInstanceOf[MapStatus]
                 val execId = status.location.executorId
                 logDebug("ShuffleMapTask finished on " + execId)
    +            if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
    +              // This task was for the currently running attempt of the stage. Since the task
    +              // completed successfully from the perspective of the TaskSetManager, mark it as
    +              // no longer pending (the TaskSetManager may consider the task complete even
    +              // when the output needs to be ignored because the task's epoch is too small below,
    +              // if so, this can result in inconsistency between pending partitions and output
    +              // locations of stage. When pending partitions is empty, the scheduler will check
    +              // output locations, if there is missing, the stage will be resubmitted.
    --- End diff --
    
    one more proposal to improve this comment:
    
    ...epoch is too small below.  In this case, when pending partitions is empty, there will still be missing output locations, which will cause the DAGScheduler to resubmit the stage below.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    LGTM! Thanks for finding this subtle bug and all of the hard work to fix it @jinxing64. I'll wait until tomorrow to merge this to give Mark and Imran a chance for any last comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    I spent a long time looking at this and I think @markhamsta's solution is the way to go, and that we should update the tests to address the failures (I think the two tests that fail are actually partially testing for incorrect behavior, which is why they're failing).  I'll post a longer writeup tomorrow midday, but wanted to provide a quick status update @jinxing64 so that you know progress is being made here!
    
    I'll also merge #16876 tomorrow, assuming tests pass, which will help slightly with implementing Mark's suggestion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100922388
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
    +
    +    assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
    +    runEvent(makeCompletionEvent(
    +      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // There should be no new attempt of stage submitted.
    +    assert(taskSets.size === 4)
    --- End diff --
    
    is this the line that would fail without your change? (just verifying my understanding)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100922270
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
    +
    +    assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
    +    runEvent(makeCompletionEvent(
    +      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // There should be no new attempt of stage submitted.
    --- End diff --
    
    can you add "because task 1 is still running in the current attempt (and hasn't completed successfully in any earlier attempts)."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99616688
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1191,8 +1191,29 @@ class DAGScheduler(
                 } else {
                   shuffleStage.addOutputLoc(smt.partitionId, status)
                 }
    -
                 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
    +              // Check if there is active TaskSetManager.
    +              val activeTaskSetManagerOpt = Option(taskScheduler.rootPool).flatMap { rootPool =>
    +                rootPool.getSortedTaskSetQueue.find { tsm =>
    +                  tsm.stageId == stageId && !tsm.isZombie
    +                }
    +              }
    +              activeTaskSetManagerOpt.foreach { activeTsm =>
    +                // The scheduler thinks we don't need any more partitions for this stage, but there
    +                // is still an active taskset for the stage.  This can happen when there are stage
    +                // retries, and we get late task completions from earlier stages.  Note that all of
    +                // the map output may or may not be available -- some of those map outputs may have
    +                // been lost.  But the most consistent way to make that determination is to end
    +                // the running taskset, and mark the stage as finished.  The DAGScheduler will
    +                // automatically determine whether there are still partitions missing that need to
    +                // be resubmitted.
    +                // NOTE: this will get a lock on the TaskScheduler
    --- End diff --
    
    Sorry this is my fault -- I gave you a bad comment here.  Can we reword that line to
    
    // We need a lock on the taskScheduler because tsm is not thread-safe, it assumes that all interactions have a lock on the taskScheduler, even just setting isZombie.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72849/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72023/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    I'll spend some time today trying to sort out the relative merits of the fix options; but in the meantime, there's also no good reason for `TaskSchedulerImpl.rootPool` to be a `var` initialized as `null`, nor any good reason for `TaskScheduler.rootPool` to be able to produce `null`.  Cleaning that up also makes code in this PR slightly simpler: https://github.com/markhamstra/spark/commit/e11fe2a9817559492daee03c8c025879dc44d346


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72376/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72761 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72761/testReport)** for PR 16620 at commit [`0cd3188`](https://github.com/apache/spark/commit/0cd31886f157794aafc31008308632efa5fc725b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72912 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72912/testReport)** for PR 16620 at commit [`e34cd85`](https://github.com/apache/spark/commit/e34cd85424d88daf96d0252d34f2ce28b956ddde).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72974/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72497/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @markhamstra  @squito 
    Thanks a lot for your helpful comments.
    I made a unit test for this fix and changed the patch. Now it can pass all unit tests for me locally.
    In this fix: add a check if there is already active(not zombie) taskSetManager before resubmission.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    As @squito mentioned:
    >Before this, the DAGScheduler didn't really know anything about taskSetManagers. (In its current form, this pr uses a "leaked" handle via rootPool.getSortedTaskSetQueue). Is adding it here a mistake? An alternative would be to add a method to TaskScheduler like markTaskSetsForStageAsZombie(stageId: Int). But that is still basically exposing the idea of "zombie" tasksets to the dagscheduler, I dunno if its actually any cleaner.
    
    I think this  a cleaner and simpler way for fixing this bug. And we can avoid adding TSM info to the DAGScheduler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    >hmm, this is a nuisance. I don't see any good way to get rid of this sleep ... but now that I think about it, why can't you do this in DAGSchedulerSuite? it seems like this can be entirely contained to the DAGScheduler and doesn't require tricky interactions with other parts of the scheduler. (I'm sorry I pointed you in the wrong direction earlier -- I thought perhaps you had tried to copy the examples of DAGSchedlerSuite but there was some reason you couldn't.)
    
    `DAGSchedulerSuite` is quite hard for me. Because this bug happens during the interreaction between `DAGScheduler` and `TaksSchedulerImpl`, actually the conflicting exception is thrown in `TaskSchedulerImpl` when `submitTasks` is called from `DAGScheduler`. `DAGSchedulerSuite` only provides a very simple `TaskScheduler`, of course I can check the conflicting in it but I don't think it is convincing enough.
    
    I don't like the `Thread.sleep(5000)` either. But I didn't find a better way. I'm sorry to add `TestDAGScheduler` in `SchedulerIntegrationSuite` just like `TestTaskScheduler` for tracking more state. But perhaps it can also be used in the future. If it is not preferred, I'm so sorry.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Thanks a lot for your comments, they are very helpful. I've already refined the code, please take another look : )
    
    When handle `Success` of `ShuffleMapTask`, what I want to do is to check whether there is some tasks running for some other partitions, if so, do not resubmit if `pendingPartitions.isEmpty && !stage.isAvailable`. there are two benefits for this:
    1. Success of the running tasks have chance to update mapstatus to `ShuffleMapStage`, and turn it to be available;
    2. Avoid submitting conflicting `taskSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99618027
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,96 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") {
    --- End diff --
    
    Can we make this test name a bit more specific, maybe "DAGScheduler should not submit multiple active tasksets, even with late completions from earlier stage attempts" (or if you have another suggestion)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72912/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72226/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r101070245
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1181,15 +1181,31 @@ class DAGScheduler(
     
               case smt: ShuffleMapTask =>
                 val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
    -            shuffleStage.pendingPartitions -= task.partitionId
                 updateAccumulators(event)
                 val status = event.result.asInstanceOf[MapStatus]
                 val execId = status.location.executorId
                 logDebug("ShuffleMapTask finished on " + execId)
    +            if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {
    +              // This task was for the currently running attempt of the stage. Since the task
    +              // completed successfully from the perspective of the TaskSetManager, mark it as
    +              // no longer pending (the TaskSetManager may consider the task complete even
    +              // when the output needs to be ignored because the task's epoch is too small below).
    +              shuffleStage.pendingPartitions -= task.partitionId
    --- End diff --
    
    I think its worth also explaining how this inconsistency between pendingPartitions and outputLocations gets resolved.  IIUC, its that when the pendingPartitions is empty, the scheduler will check outputLocations, realize something is missing, and resubmit this stage.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    The way that I am thinking about this right now is that @kayousterhout is on the right track with the early return at https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1141 , but that her proposed `stage...attemptId != task.stageAttemptId` is broader than it needs to be.  My idea is that we want to be throwing away task results from earlier attempts that were run on executors that failed (on the presumption that one fetch failure means that other fetches from there are also going to fail), but that if the executor didn't fail, then the outputs from earlier attempts of tasks that complete late but successfully on still-good executors should still be valid and available, so we should accept them as though they were successful task completions for the current attempt.
    
    What you end up with is that if-statement now looking like:
    ```scala
        val stageHasBeenCancelled = !stageIdToStage.contains(task.stageId)
        val shuffleMapTaskIsFromFailedExecutor = task match {
          case smt: ShuffleMapTask =>
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)
          case _ => false
        }
        if (stageHasBeenCancelled || shuffleMapTaskIsFromFailedExecutor) {
          return
        }
    ```
    ...and then the `failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)` check can be removed from `case smt: ShuffleMapTask =>`.
    
    If we can do it cleanly, I think we should be avoiding re-running Tasks that complete successfully and should still be available.  This is a bit different from the intent of SPARK-14649, which I am reading as an effort not to ignore the results of long-running tasks that start and eventually complete on an executor on which some other tasks actually run into fetch failures.  I'm really only trying to preserve the results of successful tasks run on executors that haven't failed.
    
    Unfortunately, the DAGSchedulerSuite doesn't agree with my intentions, because the above change actually leads to multiple test failures.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100953546
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
    +
    +    assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
    +    runEvent(makeCompletionEvent(
    +      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // There should be no new attempt of stage submitted.
    +    assert(taskSets.size === 4)
    --- End diff --
    
    Yes, I think so : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72757 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72757/testReport)** for PR 16620 at commit [`c898148`](https://github.com/apache/spark/commit/c8981482c8c0c6e0a062babcf747f861038a1279).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99660324
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -391,17 +391,18 @@ private[spark] abstract class MockBackend(
        * scheduling.
        */
       override def reviveOffers(): Unit = {
    -    val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
    -    // get the task now, since that requires a lock on TaskSchedulerImpl, to prevent individual
    -    // tests from introducing a race if they need it
    -    val newTasks = taskScheduler.synchronized {
    -      newTaskDescriptions.map { taskDescription =>
    -        val taskSet = taskScheduler.taskIdToTaskSetManager(taskDescription.taskId).taskSet
    -        val task = taskSet.tasks(taskDescription.index)
    -        (taskDescription, task)
    -      }
    -    }
    -    synchronized {
    +    // Need a lock on the entire scheduler to protect freeCores -- otherwise, multiple threads
    +    // may make offers at the same time, though they are using the same set of freeCores.
    +    taskScheduler.synchronized {
    +      val newTaskDescriptions = taskScheduler.resourceOffers(generateOffers()).flatten
    --- End diff --
    
    Is this change fixing an existing bug in the test? (if so would you mind putting the change in its own small PR that we can quickly merge?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72160 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72160/testReport)** for PR 16620 at commit [`283373d`](https://github.com/apache/spark/commit/283373d722629681929ed9dd059a6cd22be1fb73).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @markhamstra 
    Thanks a lot for your comment, I've already refined, please take another look ~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97586026
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1193,7 +1193,14 @@ class DAGScheduler(
                 }
     
                 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
    -              markStageAsFinished(shuffleStage)
    +              val noActiveTaskSetManager =
    +                taskScheduler.rootPool == null ||
    +                  !taskScheduler.rootPool.getSortedTaskSetQueue.exists {
    +                    tsm => tsm.stageId == stageId && !tsm.isZombie
    +                  }
    +              if (shuffleStage.isAvailable || noActiveTaskSetManager) {
    +                markStageAsFinished(shuffleStage)
    +              }
    --- End diff --
    
    I have to admit, though this passes all the tests, this is really confusing to me.  I only somewhat understand why your original version didn't work, and why this should be used instead.  Perhaps some more commenting here would help?  The condition under which you do `markStageAsFinished` seems very broad, so perhaps its worth a comment on the case when you do *not* (and perhaps even a `logInfo` in an `else` branch).  The discrepancy between pendingPartitions and availableOutputs is also surprising -- perhaps that is worth extra comments on `Stage`, on how the meaning of those two are different.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100922010
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
    +
    +    assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
    --- End diff --
    
    should the second part have taskSets(3) instead of taskSets(2)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #71874 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71874/testReport)** for PR 16620 at commit [`be8bfe5`](https://github.com/apache/spark/commit/be8bfe5a8f26e417ccf1c7315eb099c3d6b233d1).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout @squito @markhamstra 
    Thanks for all of your work for this patch. Really appreciate your help : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72028 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72028/testReport)** for PR 16620 at commit [`de19333`](https://github.com/apache/spark/commit/de19333053d8ea4fb5c0708a791070323749ef86).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Thanks a lot for keep reviewing this~ Your comments are very helpful ~ Thank you so much for your help ~~
    
    -when we encounter the condition where there are no pending partitions, but there is an active taskset -- we just mark that taskset as inactive
    
    It's good idea, which makes the code quite clear. I've already modified, please take another look.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72849 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72849/testReport)** for PR 16620 at commit [`ab8d13e`](https://github.com/apache/spark/commit/ab8d13efaf12182517d3b311d74b2f0a8d2fbef8).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout 
    I've refined accordingly, please take another look : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Thanks a lot. I've refined the comment, please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72913 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72913/testReport)** for PR 16620 at commit [`d225565`](https://github.com/apache/spark/commit/d2255654b1f6ae43ba47c0ffcec0e6adc4beed82).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72226 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72226/testReport)** for PR 16620 at commit [`ed1791f`](https://github.com/apache/spark/commit/ed1791fd9b6e434ec69a6c118a433e2539fed7a4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99660627
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +661,70 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    --- End diff --
    
    can you give a and b more descriptive names (e.g., rdd and shuffledRdd)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito and @jinxing64 You're right -- with the existing code, if a task from an old attempt succeeded *and* didn't run on an executor where things already failed, the DAGScheduler will count the result (just realizing this based on [this if-statement](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1189)).
    
    That being said, I think this behavior is broken, because it leads to inconsistent state between the DAGScheduler (which thinks the stage is done and submits the next ones) and the TaskSetManager for the most recent version of the stage (which is still waiting on the more recent version of tasks to complete).  When the TaskSetManager for most recent version of the stage finishes all of its tasks, it will tell the DAGScheduler -- again -- that the stage has finished, causing the DAGScheduler to update the finish time for the stage and send another (duplicate) SparkListenerStageCompleted message to the listeners (I think this will result in stages in the UI that appear to be finished yet still have running tasks), and re-update the outputs for the map stage.  None of these things are obviously buggy (from a cursory look) but they violate a bunch of invariants in the scheduler, and I wouldn't be surprised if there were bugs lurking in this code path.  Given the amount of debugging a
 nd reviewer time that gets dedicated to these subtle bugs, I'm in favor of the simpler solution that maintains consistent state between the DAGScheduler and TaskSetManager.
    
    @squito where has this behavior been argued against in the past?  My understanding is that a bunch of the scheduler code is based on an assumption that once some tasks in a stage fail with a FetchFailure, we ignore future successes from that stage because it makes the code much simpler (it's also hard, in some cases, to know whether the successes are "real", or delayed messages from machines that later failed).  There was a bigger effort to fix that issue in [SPARK-14649](https://issues.apache.org/jira/browse/SPARK-14649), but there were a bunch of subtleties in getting that right, so for now effort on that has stopped.  If someone wants to re-start the effort on that, it seems useful, but I think should be de-coupled from fixing this bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72849 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72849/testReport)** for PR 16620 at commit [`ab8d13e`](https://github.com/apache/spark/commit/ab8d13efaf12182517d3b311d74b2f0a8d2fbef8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72797/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Thanks for pointing out this issue, and the nice description.  Still looking but sounds like a legitimate issue.  I think `SchedulerIntegrationSuite` should be able to replicate the exact scenario you have outlined for adding a test case.  @jinxing64 can you look at adding a test case that way?  I can try to help there as well, but will take me a few days to get to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72497 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72497/testReport)** for PR 16620 at commit [`a02dd5c`](https://github.com/apache/spark/commit/a02dd5cc30c7c1999717ba06bc2e9adbdd020fea).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97139832
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1193,7 +1193,15 @@ class DAGScheduler(
                 }
     
                 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
    -              markStageAsFinished(shuffleStage)
    +              val activeTaskSetManagerExist =
    --- End diff --
    
    nit: should be `activeTaskSetManagerExists`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Jenkins, ok to test


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout don't overestimate my enthusiasm for my own suggestion.  I'm really just thinking aloud in search of a solution, and I agree with you that the TaskSetManager and DAGScheduler being in disagreement is not good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72226 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72226/testReport)** for PR 16620 at commit [`ed1791f`](https://github.com/apache/spark/commit/ed1791fd9b6e434ec69a6c118a433e2539fed7a4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71874/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72500 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72500/testReport)** for PR 16620 at commit [`ece3d01`](https://github.com/apache/spark/commit/ece3d01f7a77144ec8a543ab025c87f12739b3ac).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Beyond the lack of new tests, this patch is causing a couple of existing DAGSchedulerSuite tests to fail for me locally: "run trivial shuffle with out-of-band failure and retry" and "map stage submission with executor failure late map task completions"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72401 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72401/testReport)** for PR 16620 at commit [`6547773`](https://github.com/apache/spark/commit/654777345a58acad382f241502ff165c7a34dbe6).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97406162
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1218,7 +1225,9 @@ class DAGScheduler(
                     logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                       ") because some of its tasks had failed: " +
                       shuffleStage.findMissingPartitions().mkString(", "))
    -                submitStage(shuffleStage)
    +                if (noActiveTaskSetManager) {
    --- End diff --
    
    shouldn't this condition go into the surrounding `if (!shuffleStage.isAvailable)` ?  the logInfo is very confusing in this case otherwise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72227/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100921653
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    --- End diff --
    
    nit: indent two more spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    After thinking about this for a few more minutes, I'm going to retract my earlier statement about preferring my approach to yours.  I think we can file a JIRA for the bigger problem of inconsistent state between the different components -- but no reason to force this PR to fix that bigger scheduler issue.  Your approach (or the alternative I proposed immediately above) surgically fix the problem and I think it's good to merge that bug-fix separately from a more significant re-thinking of the logic here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/16620


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72160/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72797 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72797/testReport)** for PR 16620 at commit [`46ef5a3`](https://github.com/apache/spark/commit/46ef5a369902ce2ca8c0dfde64b973647f5fffeb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100921953
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    --- End diff --
    
    can you change the 2nd sentence to "The success should be ignored because the task started before the executor failed, so the output may have been lost."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100921788
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    --- End diff --
    
    can you pass null as a named parameter here? ("parameterName = null")


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72160 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72160/testReport)** for PR 16620 at commit [`283373d`](https://github.com/apache/spark/commit/283373d722629681929ed9dd059a6cd22be1fb73).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72913 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72913/testReport)** for PR 16620 at commit [`d225565`](https://github.com/apache/spark/commit/d2255654b1f6ae43ba47c0ffcec0e6adc4beed82).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Would you please take another look at this? Please give some advice if possible and I can continue working on this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72497 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72497/testReport)** for PR 16620 at commit [`a02dd5c`](https://github.com/apache/spark/commit/a02dd5cc30c7c1999717ba06bc2e9adbdd020fea).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72500 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72500/testReport)** for PR 16620 at commit [`ece3d01`](https://github.com/apache/spark/commit/ece3d01f7a77144ec8a543ab025c87f12739b3ac).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97139668
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1193,7 +1193,15 @@ class DAGScheduler(
                 }
     
                 if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
    -              markStageAsFinished(shuffleStage)
    +              val activeTaskSetManagerExist =
    +                if (taskScheduler.rootPool != null) {
    +                  taskScheduler.rootPool.getSortedTaskSetQueue.exists {
    +                    tsm => tsm.stageId == stageId && !tsm.isZombie
    +                  }
    +                } else false
    --- End diff --
    
    The `if...else` is unnecessary:
    ```scala
    val activeTaskSetManagerExist =
      taskScheduler.rootPool != null &&
      taskScheduler.rootPool.getSortedTaskSetQueue.exists { tsm =>
        tsm => tsm.stageId == stageId && !tsm.isZombie
      }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72912 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72912/testReport)** for PR 16620 at commit [`e34cd85`](https://github.com/apache/spark/commit/e34cd85424d88daf96d0252d34f2ce28b956ddde).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72776 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72776/testReport)** for PR 16620 at commit [`3a5d60d`](https://github.com/apache/spark/commit/3a5d60d74b8e37966a859d5d02b74aefb7cbee4f).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99661979
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +661,70 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    +    val b = shuffle(2, a)
    +    val shuffleId = b.shuffleDeps.head.shuffleId
    +
    +    def runBackend(): Unit = {
    +      val (taskDescription, task) = backend.beginTask()
    +      task.stageId match {
    +        // ShuffleMapTask
    +        case 0 =>
    +          val stageAttempt = task.stageAttemptId
    +          val partitionId = task.partitionId
    +          (stageAttempt, partitionId) match {
    +            case (0, 0) =>
    +              val fetchFailed = FetchFailed(
    +                DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
    +              backend.taskFailed(taskDescription, fetchFailed)
    +            case (0, 1) =>
    +              // Wait until stage resubmission caused by FetchFailed is finished.
    +              waitForCondition(taskScheduler.runningTaskSets.size == 2, 5000,
    +                "Wait until stage is resubmitted caused by fetch failed")
    +
    +              // Task(stageAttempt=0, partition=1) will be bogus, because both two
    +              // tasks(stageAttempt=0, partition=0, 1) run on hostA.
    +              // Pending partitions are (0, 1) after stage resubmission,
    +              // then change to be 0 after this bogus task.
    --- End diff --
    
    This comment is confusing to me, because I'm not sure what "bogus" means in this context.  I think this comment is trying to say that the completion of this task should be ignored, because it's from an old stage / epoch.  If that's correct, can you change the comment to that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72231 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72231/testReport)** for PR 16620 at commit [`db354c7`](https://github.com/apache/spark/commit/db354c79eadbfe177291e14e9d020234b7cfd1c5).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72776 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72776/testReport)** for PR 16620 at commit [`3a5d60d`](https://github.com/apache/spark/commit/3a5d60d74b8e37966a859d5d02b74aefb7cbee4f).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72401/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98472355
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -718,6 +703,21 @@ private[spark] class TaskSetManager(
             " because task " + index + " has already completed successfully")
         }
         maybeFinishTaskSet()
    +    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
    +    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
    +    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
    +    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
    +    // Note: "result.value()" only deserializes the value when it's called at the first time, so
    +    // here "result.value()" just returns the value and won't block other threads.
    +    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
    +    // Kill any other attempts for the same task (since those are unnecessary now that one
    +    // attempt completed successfully).
    +    for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
    +      logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
    +        s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
    +        s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
    +      sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)
    +    }
    --- End diff --
    
    could this be moved before `maybeFinishTaskSet()`, if you only need `isZombie=true`?  for performance its helpful to hand off to the dagscheduler thread as soon as we can.  Probably not a huge impact but we should try to avoid impacting performance where possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72377/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72498 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72498/testReport)** for PR 16620 at commit [`66686a7`](https://github.com/apache/spark/commit/66686a78a42def7c6777e464441af01edbd58606).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72500/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout @squito @markhamstra 
    Thanks a lot for reviewing this pr thus far. I do think the approach, which throws away task results from earlier attempts that were running on executors that failed and take `stage.pendingPartitions` as an exact mirror(in reverse) of the output locations for the state, can really fix this bug and make the code quite clear. 
    But the understanding I have previously about `stage.pendingPartitions` is a little bit different, as commented in `Stage` as below:
    ```
      /**
       * Partitions the [[DAGScheduler]] is waiting on before it tries to mark the stage / job as
       * completed and continue. Tasks' successes in both the active taskset or earlier attempts
       * for this stage can cause partition ids get removed from pendingPartitions. Finally, note
       * that when this is empty, it does not necessarily mean that stage is completed -- Some of
       * the map output from that stage may have been lost. But the [[DAGScheduler]] will check for
       * this condition and resubmit the stage if necessary.
       */
    ```
    All tasks' success can result in partition get removed `pendingPartitions`, no matter it is from a valid  executor or a failed one. Thus when the `pendingPartitions` becomes empty, we can check if the stage's output locations are all available, if not we resubmit. 
    
    If we take `stage.pendingPartitions` as an exact mirror(in reverse) of the output locations. Some unit tests can not pass in DAGSchedulerSuite(e.g. `("run trivial shuffle with out-of-band failure and retry"`). Think about below:
    1. A stage have ShuffleMapTask1 and ShuffleMapTask2, `pendingPartitions`=(0, 1)
    2. ShuffleMapTask1 succeeded on executorA and returned to driver, pendingPartitions=(1)
    3. ShuffleMapTask2 succeeded on executorA;
    4. Driver heard executorA is lost;
    5. ShuffleMapTask2's success returned to driver, still `pendingPartitions`=(1) and the stage cannot get rescheduled.
    
    In my understanding, `pendingPartitions` helps us to track running of `TaskSetManager` and know if there is still tasks coming on the way and deserve waiting, and decide when to check if the output locations are all available and whether to resubmit.
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Hi @jinxing64 
    
    I'm sorry I haven't had time to look again.  So the one big concern I had was still that test case -- I know you fixed up some of the things I complained about, but I still think it should probably be in `DAGSchedulerSuite`.  I was hoping I would be able to help out by trying to write that test case myself, but maybe you could do that?  I think its fine if you have to make `MockTaskScheduler` replicate the behavior of failing when it receives conflicting task sets.  Maybe it really can't be done for some reason I don't see yet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #71979 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71979/testReport)** for PR 16620 at commit [`3f0ebb8`](https://github.com/apache/spark/commit/3f0ebb826985a35a90584383ccd605221d0f9c43).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    ping for review~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99615449
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,96 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should avoid sending conflicting task set") {
    +    val mockTaskSchedulerImpl = new TaskSchedulerImpl(sc) {
    +      override def submitTasks(taskSet: TaskSet): Unit = {
    +        super.submitTasks(taskSet)
    +        taskSets += taskSet
    +      }
    +    }
    +    val mockDAGScheduler = new DAGScheduler(
    +      sc,
    +      mockTaskSchedulerImpl,
    +      sc.listenerBus,
    +      mapOutputTracker,
    +      blockManagerMaster,
    +      sc.env
    +    ) {
    +      override def taskEnded(
    +                              task: Task[_],
    +                              reason: TaskEndReason,
    +                              result: Any,
    +                              accumUpdates: Seq[AccumulatorV2[_, _]],
    +                              taskInfo: TaskInfo): Unit = {
    --- End diff --
    
    fix indentation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    `SchedulerIntegrationSuite` is very helpful. I like it very much, I can reproduce this issue in `SchedulerIntegrationSuite` now.
    To fix this issue, it is more complicated than I thought, I'll make some change and create a unit test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72028/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99615141
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -14,7 +14,7 @@
      * See the License for the specific language governing permissions and
      * limitations under the License.
      */
    -
    +// scalastyle:off
    --- End diff --
    
    remove
    I know this was probably an accidental addition :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72231 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72231/testReport)** for PR 16620 at commit [`db354c7`](https://github.com/apache/spark/commit/db354c79eadbfe177291e14e9d020234b7cfd1c5).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72401 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72401/testReport)** for PR 16620 at commit [`6547773`](https://github.com/apache/spark/commit/654777345a58acad382f241502ff165c7a34dbe6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @mridulm yeah once I saw this it seemed like something that's probably been a lurking issue for a bunch of jobs!!  Will be great to get this fixed -- thanks for finding it @jinxing64!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72227 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72227/testReport)** for PR 16620 at commit [`76961c3`](https://github.com/apache/spark/commit/76961c3ba64e19c43ebfc0b18651d68c54949edb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72757/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72498 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72498/testReport)** for PR 16620 at commit [`66686a7`](https://github.com/apache/spark/commit/66686a78a42def7c6777e464441af01edbd58606).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Thanks for all the investigation and the write up, @kayousterhout  This makes good sense to me, and should take us a long way toward both fixing the immediate bug and improving the code. We should also make sure that our intentions and understanding get preserved in documentation that is more obvious and accessible in the future than PR discussion threads. Probably more comments in the source code that cover the essence of your "very long write up", but maybe we should consider creating an external documentation page (wiki or something) that covers in long form what we know and intend; then we can scale down the in-code comments to a shorter form that includes pointers to the long form.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout yes, I also looked at duplicating `stage.pendingPartitions -= task.partitionId`.  I could live with that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    tl;dr I don\u2019t think Mark\u2019s change is quite correct, which is why the tests were failing.  Instead, I think we need to replace the failedEpoch if/else statement and the pendingPartitions update in DAGScheduler.handleTaskCompletion with:
    
    `if (stageIdToStage(task.stageId).latestInfo.attemptId == task.stageAttemptId) {\u2028
      // This task was for the currently running attempt of the stage. Since the task
    \u2028  // completed successfully from the perspective of the TaskSetManager, mark it as
    \u2028  // no longer pending (the TaskSetManager may consider the task complete even
      // when the output needs to be ignored because the task's epoch is too small below).
    \u2028  shuffleStage.pendingPartitions -= task.partitionId
    \u2028}
    
    \u2028\u2028if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
    \u2028  logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")\u2028
    } else {
    \u2028  // The epoch of the task is acceptable (i.e., the task was launched after the most\u2028
      // recent failure we're aware of for the executor), so mark the task's output as
    \u2028  // available.
    \u2028  shuffleStage.addOutputLoc(smt.partitionId, status)\u2028
      // Remove the task's partition from pending partitions.  This may have already been
    \u2028  // done above, but will not have been done yet in cases where the task attempt was\u2028
      // from an earlier attempt of the stage (i.e., not the attempt that's currently
    \u2028  // running).  This allows the DAGScheduler to mark the stage as complete when one\u2028
      // copy of each task has finished successfully, even if the currently active stage
    \u2028  // still has tasks running.\u2028
      shuffleStage.pendingPartitions -= task.partitionId\u2028}
    `
    
    I submitted #16892 to attempt to clarify the test case where Mark\u2019s change originally failed (this PR shouldn't block on that -- that's just to clarify things for ourselves in the future), and also wrote a very long write up of what\u2019s going on below.
    
    \u2014\u2014\u2014\u2014\u2014
    
    There are three relevant pieces of state to consider here:
    
    (1) The tasks that the TaskSetManager (TSM) considers currently pending.  The TSM encodes these pending tasks in its \u201csuccessful\u201d array.  When a task set is launched, all of its tasks are considered pending, and all of the entries in the successful array are False.  Tasks are no longer considered pending (and are marked as True in the \u201csuccessful\u201d array) if either (a) a copy of the task finishes successfully or (b) a copy of the task fails with a fetch failed (in which case the TSM assumes that the task will never complete successfully, because the previous stage needs to be re-run).  Additionally, a task that previously completed successfully can be re-marked as pending if the stage is a shuffle map stage, and the executor where the task ran died (this is because the map output needs to be re-generated, and the TSM will re-schedule the task).
    
    The TSM notifies the DAGScheduler that the stage has completed if either (a) the stage fails (e.g., there\u2019s a fetch failure) or (b) all of the entries in \u201csuccessful\u201d are true (i.e., there are no more pending tasks).
    
    (2)  ShuffleMapStage.pendingPartitions.  This variable is used by the DAGScheduler to track the pending tasks for a stage, and mostly is consistent with the TSM\u2019s pending tasks (described above).  When a stage begins, the DAGScheduler marks all of the partitions that need to be computed as pending, and then removes them from pendingPartitions as the TSM notifies the DAGScheduler that tasks have successfully completed.  When a TSM determines that a task needs to be re-run (because it\u2019s a shuffle map task that ran on a now-dead executor), the TSM sends a Resubmitted task completion event to the DAGScheduler, which causes the DAGScheduler to re-add the task to pendingPartitions (in doing so, the DAGScheduler is keeping pendingPartitions consistent with the TSM\u2019s pending tasks).
    
    I believe there are two scenarios (currently) where ShuffleMapStage.pendingPartitions and the TSM\u2019s pending tasks become inconsistent: 
    -Scenario A (performance optimization, as discussed here already): This happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, it encountered a fetch failure, so the previous stage needed to be re-run to generate the missing output).  Call the original attempt #0 and the currently running attempt #1.  If there\u2019s a task from attempt #0 that\u2019s still running, and it is running on an executor that *was not* marked as failed (this is the condition captured by the failedEpoch if-statement), and it completes successfully, this event will be handled by the TSM for attempt #0.  When the DAGScheduler hears that the task completed successfully, it will remove it from pendingPartitions (even though there\u2019s still a running copy of this task in the TSM for attempt #1, which is the currently active attempt).  This allows the DAGScheduler to mark the stage has finished earlier than when the TSM thinks that the stage is finished.
    -Scenario B (bug, as discussed): This happens in the same case as scenario one, except that it\u2019s when a task from attempt #0 completes successfully, but it\u2019s on an executor that *was* marked as failed (again, this is the failedEpoch if-statement).  In this case, the DAGScheduler considers the output \u201cbogus\u201d (because the executor has since been lost, so the output is probably gone), but the DAGScheduler still removes the task from pendingPartitions.  This can cause the DAGScheduler to determine that the stage is complete (the shuffleStage.pendingPartitions.isEmpty) if-statement, even though there\u2019s still another running copy of that task (in the TSM for attempt #1) that could complete successfully.  The DAGScheduler will notice an output is missing (\u201cif !shuffleStage.isAvailable)\u201d and re-submit the stage, leading to an exception being thrown, because there\u2019s still an active TaskSetManager.  This is the root cause of the bug here, and is fixed by the proposed code a
 bove.
    
    (3) ShuffleMapStage.outputLocs This tracks the output locations for all of the tasks in a stage.  If a stage gets re-run, only the tasks that need to be re-run will be in the two variables above, but all of the tasks in the stage (including ones that have finished) will always be in outputLocs.  One use of this variable that\u2019s different than the others is that outputLocs can be used after a stage completes and when no tasks are actively running.  For example, if a task in the next stage fails with a fetch failure, the output location for the data that triggered the failure will be removed from ShuffleMapStage.outputLocs.  outputLocs also may track multiple locations for a particular task (e.g., if two copies of the task completed successfully).
    
    As far as I understand, this will be inconsistent with pendingPartitions in two cases:
    - (A) Consider a task X that finishes successfully on an executor E.  Now suppose executor E gets marked as lost (e.g., because of another task that failed to fetch data from E), causing the DAGScheduler to update the epoch on E, and the TaskSetManager to mark X as Resubmitted.  Sometime after executor E is marked as lost, the TaskSetManager processes the task successful message for X.  The TaskSetManager still considers X to be done and marks it as successful, and the DAGScheduler removes the task from ShuffleMapStage.pendingTasks.  However, because the epoch on the machine is too old (in other words, because the DAGScheduler knows that executor E failed sometime after task X started), the DAGScheduler won\u2019t register the output location for the task.  This particular functionality is necessary for correctness, and will trigger the \u201cif !shuffleStage.isAvailable\u201d statement.  Task X needs to be re-run, and the TSM doesn\u2019t \u201cknow\u201d that X needs to be re-run (it thinks X co
 mpleted successfully).  If the DAGScheduler didn\u2019t remove X from pendingPartitions, things would end up hanging, as @jinxing64 pointed out.  This is the test case I improved in #16892.
    - (B) In buggy scenario B above (which should get fixed by this PR).
    
    There are some more fixes we could do to clean some of this up and make it easier to reason about \u2014 but in the immediate future, I think the fix at the top is the best way to fix the current bug.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72776/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72377 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72377/testReport)** for PR 16620 at commit [`e7cbea0`](https://github.com/apache/spark/commit/e7cbea014783a4582c5cfdb40059a0f61910e9c8).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout That sounds more clear, and I can see this being a problem (and probably explains some hung jobs I had seen a while earlier), thanks !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r97417513
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +648,69 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    +    val b = shuffle(2, a)
    +    val shuffleId = b.shuffleDeps.head.shuffleId
    +
    +    def runBackend(): Unit = {
    +      val (taskDescription, task) = backend.beginTask()
    +      task.stageId match {
    +        // ShuffleMapTask
    +        case 0 =>
    +          val stageAttempt = task.stageAttemptId
    +          val partitionId = task.partitionId
    +          (stageAttempt, partitionId) match {
    +            case (0, 0) =>
    +              val fetchFailed = FetchFailed(
    +                DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
    +              backend.taskFailed(taskDescription, fetchFailed)
    +            case (0, 1) =>
    +              // Wait until stage resubmission caused by FetchFailed is finished.
    +              waitUntilConditionBecomeTrue(taskScheduler.runningTaskSets.size==2, 5000,
    +                "Wait until stage is resubmitted caused by fetch failed")
    +
    +              // Task(stageAttempt=0, partition=1) will be bogus, because both two
    +              // tasks(stageAttempt=0, partition=0, 1) run on hostA.
    +              // Pending partitions are (0, 1) after stage resubmission,
    +              // then change to be 0 after this bogus task.
    +              backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostA", 2))
    +            case (1, 1) =>
    +              // Wait long enough until Success of task(stageAttempt=1 and partition=0)
    +              // is handled by DAGScheduler.
    +              Thread.sleep(5000)
    +              // Task(stageAttempt=1 and partition=0) will cause stage resubmission,
    +              // because shuffleStage.pendingPartitions.isEmpty,
    +              // but shuffleStage.isAvailable is false.
    +              backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2))
    +            case _ =>
    +              backend.taskSuccess(taskDescription, DAGSchedulerSuite.makeMapStatus("hostB", 2))
    +          }
    +        // ResultTask
    +        case 1 => backend.taskSuccess(taskDescription, 10)
    +      }
    +    }
    +
    +    withBackend(runBackend _) {
    +      val jobFuture = submit(b, (0 until 2).toArray)
    +      val duration = Duration(15, SECONDS)
    +      awaitJobTermination(jobFuture, duration)
    +    }
    +    assert(results === (0 until 2).map { _ -> 10}.toMap)
    +  }
    +
    +  def waitUntilConditionBecomeTrue(condition: => Boolean, timeout: Long, msg: String): Unit = {
    --- End diff --
    
    nit: rename to `waitForCondition` (maybe irrevlant given other comments)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72974 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72974/testReport)** for PR 16620 at commit [`6809d1f`](https://github.com/apache/spark/commit/6809d1ff5d09693e961087da35c8f6b3b50fe53c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72028 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72028/testReport)** for PR 16620 at commit [`de19333`](https://github.com/apache/spark/commit/de19333053d8ea4fb5c0708a791070323749ef86).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #3540 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/3540/testReport)** for PR 16620 at commit [`9e4aab2`](https://github.com/apache/spark/commit/9e4aab2addf2c8ed5e208938532f2fcbaf3547c0).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98699067
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1212,8 +1223,9 @@ class DAGScheduler(
     
                   clearCacheLocs()
     
    -              if (!shuffleStage.isAvailable) {
    -                // Some tasks had failed; let's resubmit this shuffleStage
    +              if (!shuffleStage.isAvailable && noActiveTaskSetManager) {
    --- End diff --
    
    You need to update this for mapStageJobs -- the `else` branch will now run if the shuffleStage is not available, but there is an active task set manager, which we don't want.  Also calling `submitWaitingChildStages(shuffleStage)` is confusing (though it seems to be correct).
    
    (or use the other version I suggested)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should handle stage's pending...

Posted by markhamstra <gi...@git.apache.org>.
Github user markhamstra commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    ok to test



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @squito 
    Thanks a lot for helping this PR thus far.
    I've added unit test in `DAGSchedulerSuite`, but not sure if it is exactly what you suggest. 
    I created a `mockTaskSchedulerImpl`. Since lots of status are maintained in `TaskScheudlerImpl`, I have to trigger the event by `resourceOffers`, `handleSuccessfulTask`, `handleFailedTask`.
    Please give another look at this when you have time. Really appreciate if you could help.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout 
    Thanks a lot for the clear explanation. It makes great sense to me and help me understand the logic a lot. Also I think the way of testing is very good and make the code very clear. I've already refined this pr, please take a look when tests pass.
    Also with understanding of your explanation above in 
    >Scenario A (performance optimization, as discussed here already): This happens if a ShuffleMapStage gets re-run (e.g., because the first time it ran, it encountered a fetch failure, so the previous stage needed to be re-run to generate the missing output). ... 
    
    I made #16901  to add a test that success of old attempt should be taken as valid and corresponding pending partition should be removed. Please give a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72913/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98488916
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala ---
    @@ -718,6 +703,21 @@ private[spark] class TaskSetManager(
             " because task " + index + " has already completed successfully")
         }
         maybeFinishTaskSet()
    +    // This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
    +    // "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
    +    // "deserialize" the value when holding a lock to avoid blocking other threads. So we call
    +    // "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
    +    // Note: "result.value()" only deserializes the value when it's called at the first time, so
    +    // here "result.value()" just returns the value and won't block other threads.
    +    sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info)
    +    // Kill any other attempts for the same task (since those are unnecessary now that one
    +    // attempt completed successfully).
    +    for (attemptInfo <- taskAttempts(index) if attemptInfo.running) {
    +      logInfo(s"Killing attempt ${attemptInfo.attemptNumber} for task ${attemptInfo.id} " +
    +        s"in stage ${taskSet.id} (TID ${attemptInfo.taskId}) on ${attemptInfo.host} " +
    +        s"as the attempt ${info.attemptNumber} succeeded on ${info.host}")
    +      sched.backend.killTask(attemptInfo.taskId, attemptInfo.executorId, true)
    +    }
    --- End diff --
    
    @squito 
    Yes, it make sense to move this part before `maybeFinishTaskSet()`, I will refine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/71979/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout @squito @markhamstra 
    
    Thanks a lot for for the comments. I've already refined accordingly.
    I still have one concern:
    > If this is a correct description, I\u2019d argue that (5) is the problem: that when ShuffleMapTask2 finishes, we should not be updating a bunch of state in the DAGScheduler saying that there\u2019s output ready as a result. If I\u2019m understanding correctly, there\u2019s a relatively simple fix to this problem: In DAGScheduler.scala, in handleTaskCompletion, we should exit (and not update any state) when the task is from an earlier stage attempt that\u2019s not the current active attempt. This can be done by changing the if-statement on line 1141 to include:
    || stageIdToStage(task.stageId).latestInfo.attemptId != task.stageAttemptId
    
    With above, are we ignoring all the results from old stage attempts?
    As @squito mentioned:
    > It also can potentially improve performance, since you may submit downstream stages more quickly, rather than waiting for all tasks in the active taskset to complete.
    
    Is it maybe beneficial to add up the result from old stage attempts.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @markhamstra I prefer that approach to the approach in the existing PR, but I still have some hesitation about that because of the inconsistencies between the TaskSetManager (which still thinks tasks are running) and DAGScheduler (which thinks the stage is done), as mentioned in my comment above.  It sounds like everyone else prefers that approach though -- perhaps we can at least add some better commenting so future readers of the code know the DAGSched and TSM will have different views of the world and that listeners may get duplicate stage completed messages as a result?
    
    Another argument for your approach is that it's *no worse* than the current code, and is the smallest change (I think) that can fix the bug.  We can fix the larger issues in a separate PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72231/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #71874 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/71874/testReport)** for PR 16620 at commit [`be8bfe5`](https://github.com/apache/spark/commit/be8bfe5a8f26e417ccf1c7315eb099c3d6b233d1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98703683
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +660,70 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    +    val a = new MockRDD(sc, 2, Nil)
    +    val b = shuffle(2, a)
    +    val shuffleId = b.shuffleDeps.head.shuffleId
    +
    +    def runBackend(): Unit = {
    +      val (taskDescription, task) = backend.beginTask()
    +      task.stageId match {
    +        // ShuffleMapTask
    +        case 0 =>
    +          val stageAttempt = task.stageAttemptId
    +          val partitionId = task.partitionId
    +          (stageAttempt, partitionId) match {
    +            case (0, 0) =>
    +              val fetchFailed = FetchFailed(
    +                DAGSchedulerSuite.makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored")
    +              backend.taskFailed(taskDescription, fetchFailed)
    +            case (0, 1) =>
    +              // Wait until stage resubmission caused by FetchFailed is finished.
    +              waitForCondition(taskScheduler.runningTaskSets.size==2, 5000,
    --- End diff --
    
    nit: spaces around `==`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r98819685
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1212,8 +1223,9 @@ class DAGScheduler(
     
                   clearCacheLocs()
     
    -              if (!shuffleStage.isAvailable) {
    -                // Some tasks had failed; let's resubmit this shuffleStage
    +              if (!shuffleStage.isAvailable && noActiveTaskSetManager) {
    --- End diff --
    
    Hrmm... yes, @squito , we shouldn't go to else branch when the shuffleStage is not available but active `TaskSetManager` exists.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Yes, refined : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72974 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72974/testReport)** for PR 16620 at commit [`6809d1f`](https://github.com/apache/spark/commit/6809d1ff5d09693e961087da35c8f6b3b50fe53c).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Hi @jinxing64 
    sorry to go back and forth on this numerous times -- I think I have another alternative, see https://github.com/squito/spark/tree/SPARK-19263_alternate
    
    Its most of your changes but with one main difference:  when we encounter the condition where there are no pending partitions, but there is an active taskset -- we just mark that taskset as inactive and continue as before  https://github.com/squito/spark/commit/bec061c8486a681dc16e8b92e553f79e486924e9.  I think this makes it easier to follow, as there are fewer states to keep track of.  It also can potentially improve performance, since you may submit downstream stages more quickly, rather than waiting for all tasks in the active taskset to complete.  I also think it fixes a bug in your version with mapStageJobs (I'll point it out in the code).
    
    This passes all tests in `o.a.s.scheduler.*`, including your new test case. (I did come across a race in `ScheduleIntegrationSuite` which I fixed https://github.com/squito/spark/commit/9125e6738269df4e0d7e6292726bad2a294c86c0 not directly related to these changes).
    
    Do you see any problems w/ that approach?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72757 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72757/testReport)** for PR 16620 at commit [`c898148`](https://github.com/apache/spark/commit/c8981482c8c0c6e0a062babcf747f861038a1279).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99618127
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -648,4 +661,70 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor
         }
         assertDataStructuresEmpty(noFailure = false)
       }
    +
    +  testScheduler("[SPARK-19263] DAGScheduler shouldn't resubmit active taskSet.") {
    --- End diff --
    
    same here on the test name, perhaps "DAGScheduler should not submit multiple active tasksets, even with late completions from earlier stage attempts"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r100922486
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -2161,6 +2161,58 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
         }
       }
     
    +  test("[SPARK-19263] DAGScheduler should not submit multiple active tasksets," +
    +    " even with late completions from earlier stage attempts") {
    +    val rddA = new MyRDD(sc, 2, Nil)
    +    val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2))
    +    val shuffleIdA = shuffleDepA.shuffleId
    +
    +    val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker)
    +    val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2))
    +
    +    val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker)
    +
    +    submit(rddC, Array(0, 1))
    +
    +    assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0)
    +    complete(taskSets(0), Seq(
    +      (Success, makeMapStatus("hostA", 2)),
    +      (Success, makeMapStatus("hostA", 2))))
    +
    +    // Fetch failed on hostA.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(0),
    +      FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0,
    +        "Fetch failure of task: stageId=1, stageAttempt=0, partitionId=0"),
    +      null))
    +
    +    scheduler.resubmitFailedStages()
    +
    +    assert(taskSets(2).stageId === 0 && taskSets(2).stageAttemptId === 1)
    +    complete(taskSets(2), Seq(
    +      (Success, makeMapStatus("hostB", 2)),
    +      (Success, makeMapStatus("hostB", 2))))
    +
    +    // Task succeeds on a failed executor. The success is bogus.
    +    runEvent(makeCompletionEvent(
    +      taskSets(1).tasks(1), Success, makeMapStatus("hostA", 2)))
    +
    +    assert(taskSets(3).stageId === 1 && taskSets(2).stageAttemptId === 1)
    +    runEvent(makeCompletionEvent(
    +      taskSets(3).tasks(0), Success, makeMapStatus("hostB", 2)))
    +
    +    // There should be no new attempt of stage submitted.
    +    assert(taskSets.size === 4)
    +    runEvent(makeCompletionEvent(
    +      taskSets(3).tasks(1), Success, makeMapStatus("hostB", 2)))
    +
    +    // ResultStage submitted.
    --- End diff --
    
    "Now the ResultStage should be submitted, because all of the tasks to generate rddB have completed successfully on alive executors."


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by jinxing64 <gi...@git.apache.org>.
Github user jinxing64 commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @markhamstra @squito @kayousterhout 
    It would be great if you can give more comments about above and I can continue working on this : )


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [WIP][SPARK-19263] DAGScheduler should avoid sending con...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/72761/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    Thanks all for the work on this!  I've merged this into master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72797 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72797/testReport)** for PR 16620 at commit [`46ef5a3`](https://github.com/apache/spark/commit/46ef5a369902ce2ca8c0dfde64b973647f5fffeb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by squito <gi...@git.apache.org>.
Github user squito commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    @kayousterhout I think your fix is correct, but I actually think its a bigger change in behavior, one that has been explicitly argued *against* in the past.  I think the idea is that if you've got a bunch of tasks completing from an old attempt for a stage, you dont' want to throw all that work away, as @jinxing64 mentioned.
    
    You may have a large number of resources tied up computing tasks from a previous attempt, and the results are completely correct, but you still throw those results away.  (Its especially bad since we're [still not canceling tasks from previous attempts](https://issues.apache.org/jira/browse/SPARK-2666).) I do think that code would be simplified  with the change you are suggesting -- late task completions from an earlier stage have been the cause of more bugs in the past.  and this is all only happening when there is a fetch failure, not when everything is running smoothly.
    
    But I do think its a rather large change in behavior which we should weigh carefully.  I was even worried that the change I was proposing would lead to some cases where tasks would get fully computed, and then the results would get thrown away, but it was necessary for correctness.
    
    Also I should mention, that I'm not even 100% sure about this -- I have to admit I find the epoch logic to be confusing and perhaps with a careful read, we'll see there really isn't much more that is getting thrown away than was already from epochs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #16620: [SPARK-19263] DAGScheduler should avoid sending conflict...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/16620
  
    **[Test build #72023 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/72023/testReport)** for PR 16620 at commit [`be7e701`](https://github.com/apache/spark/commit/be7e701d0f01081eec5e25e2256eb45560fbe97b).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #16620: [SPARK-19263] DAGScheduler should avoid sending c...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/16620#discussion_r99659930
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala ---
    @@ -519,6 +520,18 @@ class TestTaskScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
       }
     }
     
    +/** DAGScheduler that just tracks a tiny bit more state to enable checks in tests. */
    +class TestDAGScheduler(sc: SparkContext, taskScheduler: TaskScheduler)
    --- End diff --
    
    what about DAGSchedulerWithTracking, to make it more clear that this is a normal DAGScheduler with a little bit of extra tracking


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org