You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by witgo <gi...@git.apache.org> on 2014/08/10 17:05:49 UTC

[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

GitHub user witgo opened a pull request:

    https://github.com/apache/spark/pull/1877

    [WIP][SPARK-2947] DAGScheduler resubmit the stage into an infinite loop

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/witgo/spark SPARK-2947

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1877.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1877
    
----
commit 71de1c0e67e6b0731913d28906b96be18d0a4a05
Author: GuoQiang Li <wi...@qq.com>
Date:   2014-08-10T15:03:23Z

    DAGScheduler resubmit the stage into an infinite loop

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53676634
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19375/consoleFull) for   PR 1877 at commit [`c4b0f91`](https://github.com/apache/spark/commit/c4b0f91d63aaacc2d62455ae01fcea307a4db6e8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51806432
  
    It takes some time to add a test for this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53902536
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19482/consoleFull) for   PR 1877 at commit [`bf6f81a`](https://github.com/apache/spark/commit/bf6f81a602c84c0d016c0f71cb93d567ce05d185).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51887280
  
    QA tests have started for PR 1877. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18367/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53677259
  
     [SPARK-3224](https://issues.apache.org/jira/browse/SPARK-3224) is the same problem. 
    This PR adds some boundary judgments and removed some redundant code


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53895213
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19482/consoleFull) for   PR 1877 at commit [`bf6f81a`](https://github.com/apache/spark/commit/bf6f81a602c84c0d016c0f71cb93d567ce05d185).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51718834
  
    QA results for PR 1877:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18281/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by kayousterhout <gi...@git.apache.org>.
Github user kayousterhout commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r16823367
  
    --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---
    @@ -472,6 +472,44 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
         assert(sparkListener.failedStages.size == 1)
       }
     
    +  test("run trivial shuffle with repeated fetch failure") {
    --- End diff --
    
    can you change this and/or the name for the test at line 438? They are currently almost identical such that it's unclear what the point of each test is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-55514815
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20293/consoleFull) for   PR 1877 at commit [`958d7db`](https://github.com/apache/spark/commit/958d7db4eb83ca493c1cd49ce2ba4f32d19f39f1).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class RatingDeserializer(FramedSerializer):`
      * `  class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T] `
      * `  class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T] `
      * `  class Encoder[T <: NativeType](columnType: NativeColumnType[T]) extends compression.Encoder[T] `
      * `  class Encoder extends compression.Encoder[IntegerType.type] `
      * `  class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[IntegerType.type])`
      * `  class Encoder extends compression.Encoder[LongType.type] `
      * `  class Decoder(buffer: ByteBuffer, columnType: NativeColumnType[LongType.type])`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-54976720
  
    screenshots:
    ![qq20140909-1](https://cloud.githubusercontent.com/assets/302879/4203071/131c5292-382d-11e4-88d3-6d9bb50a8389.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-55482903
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/85/consoleFull) for   PR 1877 at commit [`bf6f81a`](https://github.com/apache/spark/commit/bf6f81a602c84c0d016c0f71cb93d567ce05d185).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `      throw new IllegalStateException("The main method in the given main class must be static")`
      * `class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception `
      * `        class Dummy(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53372719
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19194/consoleFull) for   PR 1877 at commit [`3484c29`](https://github.com/apache/spark/commit/3484c29166ff68a9b54b7a3e0df59e1bd1e389cf).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `    $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$`
      * `    $FWDIR/bin/spark-submit --class org.apache.spark.repl.Main "$`
      * `class KMeansModel (val clusterCenters: Array[Vector]) extends Serializable `
      * `class BoundedFloat(float):`
      * `class JoinedRow2 extends Row `
      * `class JoinedRow3 extends Row `
      * `class JoinedRow4 extends Row `
      * `class JoinedRow5 extends Row `
      * `class GenericRow(protected[sql] val values: Array[Any]) extends Row `
      * `abstract class MutableValue extends Serializable `
      * `final class MutableInt extends MutableValue `
      * `final class MutableFloat extends MutableValue `
      * `final class MutableBoolean extends MutableValue `
      * `final class MutableDouble extends MutableValue `
      * `final class MutableShort extends MutableValue `
      * `final class MutableLong extends MutableValue `
      * `final class MutableByte extends MutableValue `
      * `final class MutableAny extends MutableValue `
      * `final class SpecificMutableRow(val values: Array[MutableValue]) extends MutableRow `
      * `case class CountDistinct(expressions: Seq[Expression]) extends PartialAggregate `
      * `case class CollectHashSet(expressions: Seq[Expression]) extends AggregateExpression `
      * `case class CollectHashSetFunction(`
      * `case class CombineSetsAndCount(inputSet: Expression) extends AggregateExpression `
      * `case class CombineSetsAndCountFunction(`
      * `case class CountDistinctFunction(`
      * `case class MaxOf(left: Expression, right: Expression) extends Expression `
      * `case class NewSet(elementType: DataType) extends LeafExpression `
      * `case class AddItemToSet(item: Expression, set: Expression) extends Expression `
      * `case class CombineSets(left: Expression, right: Expression) extends BinaryExpression `
      * `case class CountSet(child: Expression) extends UnaryExpression `
      * `case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends Command `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53369829
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19194/consoleFull) for   PR 1877 at commit [`3484c29`](https://github.com/apache/spark/commit/3484c29166ff68a9b54b7a3e0df59e1bd1e389cf).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53676613
  
    Can you explain what problem you are seeing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r16045934
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1024,31 +1024,33 @@ class DAGScheduler(
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             // Mark the stage that the reducer was in as unrunnable
             val failedStage = stageIdToStage(task.stageId)
    -        runningStages -= failedStage
    -        // TODO: Cancel running tasks in the stage
    --- End diff --
    
    @mateiz 
    There is no  cancel running tasks in the stage . When any one of the running  tasks which throws an exception.
    The following code will be repeated.
    
    ```scala
    failedStages += failedStage
    failedStages += mapStage
    ```
    Stage will be unnecessary resubmit by `resubmitFailedStages`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53679879
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19375/consoleFull) for   PR 1877 at commit [`c4b0f91`](https://github.com/apache/spark/commit/c4b0f91d63aaacc2d62455ae01fcea307a4db6e8).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51717429
  
    QA tests have started for PR 1877. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18281/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r17510517
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1046,41 +1046,37 @@ class DAGScheduler(
     
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             val failedStage = stageIdToStage(task.stageId)
    -        val mapStage = shuffleToMapStage(shuffleId)
     
             // It is likely that we receive multiple FetchFailed for a single stage (because we have
             // multiple tasks running concurrently on different executors). In that case, it is possible
             // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    +        if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) {
    +          val mapStage = shuffleToMapStage(shuffleId)
               logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
                 s"due to a fetch failure from $mapStage (${mapStage.name})")
    -          markStageAsFinished(failedStage, Some("Fetch failure"))
    -          runningStages -= failedStage
    -        }
    -
    -        if (failedStages.isEmpty && eventProcessActor != null) {
    -          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
    -          // in that case the event will already have been scheduled. eventProcessActor may be
    -          // null during unit tests.
               // TODO: Cancel running tasks in the stage
    -          import env.actorSystem.dispatcher
    -          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -            s"$failedStage (${failedStage.name}) due to fetch failure")
    -          env.actorSystem.scheduler.scheduleOnce(
    -            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    +          markStageAsFinished(failedStage, Some("Fetch failure"))
    +          if (eventProcessActor != null) {
    +            // eventProcessActor may be null during unit tests.
    +            import env.actorSystem.dispatcher
    +            logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    +              s"$failedStage (${failedStage.name}) due to fetch failure")
    +            env.actorSystem.scheduler.scheduleOnce(
    +              RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    +          }
    +          failedStages += failedStage
    +          failedStages += mapStage
     
    -        // Mark the map whose fetch failed as broken in the map stage
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    +          // Mark the map whose fetch failed as broken in the map stage
    +          if (mapId != -1) {
    +            mapStage.removeOutputLoc(mapId, bmAddress)
    +            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    +          }
     
    -        // TODO: mark the executor as failed only if there were lots of fetch failures on it
    -        if (bmAddress != null) {
    -          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
    +          // TODO: mark the executor as failed only if there were lots of fetch failures on it
    +          if (bmAddress != null) {
    --- End diff --
    
    once you put this within the conditional statement, only one executor failure will be handled for each stage. that means if there are two executor fails, the 2nd one gets ignored by the dagscheduler, isn't it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo closed the pull request at:

    https://github.com/apache/spark/pull/1877


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r17510561
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1046,41 +1046,37 @@ class DAGScheduler(
     
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             val failedStage = stageIdToStage(task.stageId)
    -        val mapStage = shuffleToMapStage(shuffleId)
     
             // It is likely that we receive multiple FetchFailed for a single stage (because we have
             // multiple tasks running concurrently on different executors). In that case, it is possible
             // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    +        if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) {
    +          val mapStage = shuffleToMapStage(shuffleId)
               logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
                 s"due to a fetch failure from $mapStage (${mapStage.name})")
    -          markStageAsFinished(failedStage, Some("Fetch failure"))
    -          runningStages -= failedStage
    -        }
    -
    -        if (failedStages.isEmpty && eventProcessActor != null) {
    -          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
    -          // in that case the event will already have been scheduled. eventProcessActor may be
    -          // null during unit tests.
               // TODO: Cancel running tasks in the stage
    -          import env.actorSystem.dispatcher
    -          logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    -            s"$failedStage (${failedStage.name}) due to fetch failure")
    -          env.actorSystem.scheduler.scheduleOnce(
    -            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    +          markStageAsFinished(failedStage, Some("Fetch failure"))
    +          if (eventProcessActor != null) {
    +            // eventProcessActor may be null during unit tests.
    +            import env.actorSystem.dispatcher
    +            logInfo(s"Resubmitting $mapStage (${mapStage.name}) and " +
    +              s"$failedStage (${failedStage.name}) due to fetch failure")
    +            env.actorSystem.scheduler.scheduleOnce(
    +              RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    +          }
    +          failedStages += failedStage
    +          failedStages += mapStage
     
    -        // Mark the map whose fetch failed as broken in the map stage
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    +          // Mark the map whose fetch failed as broken in the map stage
    +          if (mapId != -1) {
    +            mapStage.removeOutputLoc(mapId, bmAddress)
    +            mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    +          }
     
    -        // TODO: mark the executor as failed only if there were lots of fetch failures on it
    -        if (bmAddress != null) {
    -          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
    +          // TODO: mark the executor as failed only if there were lots of fetch failures on it
    +          if (bmAddress != null) {
    --- End diff --
    
    @rxin  Yes, here is unnecessary modifications to processing logic, I negligence.
    是的,这里处理逻辑被不必要的修改了,疏忽了.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r17302741
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1046,41 +1046,37 @@ class DAGScheduler(
     
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             val failedStage = stageIdToStage(task.stageId)
    -        val mapStage = shuffleToMapStage(shuffleId)
     
             // It is likely that we receive multiple FetchFailed for a single stage (because we have
             // multiple tasks running concurrently on different executors). In that case, it is possible
             // the fetch failure has already been handled by the scheduler.
    -        if (runningStages.contains(failedStage)) {
    +        if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) {
    --- End diff --
    
    @rxin  Because there is no cancel running tasks in the stage. `stage.pendingTasks.contains(task)`  is necessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1877#discussion_r16046073
  
    --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala ---
    @@ -1024,31 +1024,33 @@ class DAGScheduler(
           case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
             // Mark the stage that the reducer was in as unrunnable
             val failedStage = stageIdToStage(task.stageId)
    -        runningStages -= failedStage
    -        // TODO: Cancel running tasks in the stage
    -        logInfo("Marking " + failedStage + " (" + failedStage.name +
    -          ") for resubmision due to a fetch failure")
    -        // Mark the map whose fetch failed as broken in the map stage
    -        val mapStage = shuffleToMapStage(shuffleId)
    -        if (mapId != -1) {
    -          mapStage.removeOutputLoc(mapId, bmAddress)
    -          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
    -        }
    -        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
    -          "); marking it for resubmission")
    -        if (failedStages.isEmpty && eventProcessActor != null) {
    -          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
    -          // in that case the event will already have been scheduled. eventProcessActor may be
    -          // null during unit tests.
    -          import env.actorSystem.dispatcher
    -          env.actorSystem.scheduler.scheduleOnce(
    -            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
    -        }
    -        failedStages += failedStage
    -        failedStages += mapStage
    -        // TODO: mark the executor as failed only if there were lots of fetch failures on it
    -        if (bmAddress != null) {
    -          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
    +        if (runningStages.contains(failedStage) && stage.pendingTasks.contains(task)) {
    --- End diff --
    
    Determines whether a task is present in `stage.pendingTasks`, This avoids the issue of the above said


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [WIP][SPARK-2947] DAGScheduler resubmit the st...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51730277
  
    @witgo can you explain how this happens and why the fix works, and add a unit test for it? We can't really merge something like this without a test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-55513899
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20293/consoleFull) for   PR 1877 at commit [`958d7db`](https://github.com/apache/spark/commit/958d7db4eb83ca493c1cd49ce2ba4f32d19f39f1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-55482022
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/85/consoleFull) for   PR 1877 at commit [`bf6f81a`](https://github.com/apache/spark/commit/bf6f81a602c84c0d016c0f71cb93d567ce05d185).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-51892023
  
    QA results for PR 1877:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18367/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by witgo <gi...@git.apache.org>.
Github user witgo commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53676540
  
    @rxin  could you take a look at this PR? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2947] DAGScheduler resubmit the stage i...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/1877#issuecomment-53326685
  
    @witgo Could you rebase this PR onto master? There are some conflict right now. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org