You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by xuanyuanking <gi...@git.apache.org> on 2018/02/26 07:33:17 UTC

[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

GitHub user xuanyuanking opened a pull request:

    https://github.com/apache/spark/pull/20675

    [SPARK-23033][SS][Follow Up] Task level retry for continuous processing

    ## What changes were proposed in this pull request?
    
    Here we want to reimplement the task level retry for continuous processing, changes include:
    1. Add a new `EpochCoordinatorMessage` named `GetLastEpochAndOffset`, it is used for getting last epoch and offset of particular partition while task restarted.
    2. Add function setOffset for `ContinuousDataReader`, it supported BaseReader can restart from given offset.
    
    ## How was this patch tested?
    
    Add new UT in `ContinuousSuite` and new `StreamAction` named `CheckAnswerRowsContainsOnlyOnce` for more accurate result checking.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/xuanyuanking/spark SPARK-23033

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20675.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20675
    
----
commit 21f574e2a3ad3c8e68b92776d2a141d7fcb90502
Author: Yuanjian Li <xy...@...>
Date:   2018-02-26T07:27:10Z

    [SPARK-23033][SS][Follow Up] Task level retry for continuous processing

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    @HeartSaVioR Thanks for your reply, sorry for just seen your comment. Yep, will keep tracking this feature after we supports shuffled stateful operators.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87665/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r171014505
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java ---
    @@ -33,4 +33,16 @@
          * as a restart checkpoint.
          */
         PartitionOffset getOffset();
    +
    +    /**
    +     * Set the start offset for the current record, only used in task retry. If setOffset keep
    +     * default implementation, it means current ContinuousDataReader can't support task level retry.
    +     *
    +     * @param offset last offset before task retry.
    +     */
    +    default void setOffset(PartitionOffset offset) {
    --- End diff --
    
    I think it might be better to create a new interface ContinuousDataReaderFactory, and implement this there as something like `createDataReaderWithOffset(PartitionOffset offset)`. That way the intended lifecycle is explicit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    **[Test build #87665 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87665/testReport)** for PR 20675 at commit [`21f574e`](https://github.com/apache/spark/commit/21f574e2a3ad3c8e68b92776d2a141d7fcb90502).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r170830121
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala ---
    @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
         spark.sparkContext.addSparkListener(listener)
         try {
           testStream(df, useV2Sink = true)(
    -        StartStream(Trigger.Continuous(100)),
    +        StartStream(longContinuousTrigger),
    +        AwaitEpoch(0),
             Execute(waitForRateSourceTriggers(_, 2)),
    +        IncrementEpoch(),
             Execute { _ =>
               // Wait until a task is started, then kill its first attempt.
               eventually(timeout(streamingTimeout)) {
                 assert(taskId != -1)
               }
               spark.sparkContext.killTaskAttempt(taskId)
             },
    -        ExpectFailure[SparkException] { e =>
    -          e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
    -        })
    +        Execute(waitForRateSourceTriggers(_, 4)),
    +        IncrementEpoch(),
    +        // Check the answer exactly, if there's duplicated result, CheckAnserRowsContains
    +        // will also return true.
    +        CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
    --- End diff --
    
    Actually I firstly use `CheckAnswer(0 to 19: _*)` here, but I found the test case failure probably because the CP maybe not stop between Range(0, 20) every time. See the logs below:
    ```
    == Plan ==
    == Parsed Logical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Analyzed Logical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Optimized Logical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- Project [value#13L]
       +- StreamingDataSourceV2Relation [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
    
    == Physical Plan ==
    WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MemoryStreamWriter@6435422d
    +- *(1) Project [value#13L]
       +- *(1) DataSourceV2Scan [timestamp#12, value#13L], org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader@5c5d9c45
             
             
    ScalaTestFailureLocation: org.apache.spark.sql.streaming.StreamTest$class at (StreamTest.scala:436)
    org.scalatest.exceptions.TestFailedException: 
    
    == Results ==
    !== Correct Answer - 20 ==   == Spark Answer - 25 ==
    !struct<value:int>           struct<value:bigint>
     [0]                         [0]
     [10]                        [10]
     [11]                        [11]
     [12]                        [12]
     [13]                        [13]
     [14]                        [14]
     [15]                        [15]
     [16]                        [16]
     [17]                        [17]
     [18]                        [18]
     [19]                        [19]
     [1]                         [1]
    ![2]                         [20]
    ![3]                         [21]
    ![4]                         [22]
    ![5]                         [23]
    ![6]                         [24]
    ![7]                         [2]
    ![8]                         [3]
    ![9]                         [4]
    !                            [5]
    !                            [6]
    !                            [7]
    !                            [8]
    !                            [9]
        
    
    == Progress ==
       StartStream(ContinuousTrigger(3600000),org.apache.spark.util.SystemClock@343e225a,Map(),null)
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
       AssertOnQuery(<condition>, )
    => CheckAnswer: [0],[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19]
       StopStream
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    > it just means that for very long-running streams task restarts will eventually run out.
    
    Ah, I know your means. Yeah, if we support task level retry we should also set the task retry number unlimited.
    
    > But if you're worried that the current implementation of task restart will become incorrect as more complex scenarios are supported, I'd definitely lean towards deferring it until continuous processing is more feature-complete.
    
    Yep, the "complex scenarios" I mentioned mainly including shuffle and aggregation scenario like comments in https://issues.apache.org/jira/browse/SPARK-20928?focusedCommentId=16245556&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16245556, in those scenario maybe task level retry should consider epoch align, but current implementation of task restart is completed for map-only continuous processing I think.
    
    Agree with you about deferring it, so I just leave a comment in SPARK-23033 and close this or you think this should reviewed by others?
    
    > Do you want to spin that off into a separate PR? (I can handle it otherwise.)
    
    Of cause, #20689 added a new interface `ContinuousDataReaderFactory` as our comments.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r170665920
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala ---
    @@ -194,6 +194,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
         private def operatorName = if (lastOnly) "CheckLastBatch" else "CheckAnswer"
       }
     
    +  case class CheckAnswerRowsContainsOnlyOnce(expectedAnswer: Seq[Row], lastOnly: Boolean = false)
    --- End diff --
    
    no need to add this - redundant with CheckAnswer


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1053/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    It's not semantically wrong that the attempt number is never reset; it just means that for very long-running streams task restarts will eventually run out.
    
    You make a good point that in high parallelism cases we might need to be able to restart only a single task, although I think we'd still need query-level restart on top of that. But if you're worried that the current implementation of task restart will become incorrect as more complex scenarios are supported, I'd definitely lean towards deferring it until continuous processing is more feature-complete.
    
    I was working on getting basic aggregation working, and I think we definitely will need some kind of setOffset-like functionality. Do you want to spin that off into a separate PR? (I can handle it otherwise.)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    **[Test build #87665 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87665/testReport)** for PR 20675 at commit [`21f574e`](https://github.com/apache/spark/commit/21f574e2a3ad3c8e68b92776d2a141d7fcb90502).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r170665692
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala ---
    @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
         spark.sparkContext.addSparkListener(listener)
         try {
           testStream(df, useV2Sink = true)(
    -        StartStream(Trigger.Continuous(100)),
    +        StartStream(longContinuousTrigger),
    +        AwaitEpoch(0),
             Execute(waitForRateSourceTriggers(_, 2)),
    +        IncrementEpoch(),
             Execute { _ =>
               // Wait until a task is started, then kill its first attempt.
               eventually(timeout(streamingTimeout)) {
                 assert(taskId != -1)
               }
               spark.sparkContext.killTaskAttempt(taskId)
             },
    -        ExpectFailure[SparkException] { e =>
    -          e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
    -        })
    +        Execute(waitForRateSourceTriggers(_, 4)),
    +        IncrementEpoch(),
    +        // Check the answer exactly, if there's duplicated result, CheckAnserRowsContains
    +        // will also return true.
    +        CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
    --- End diff --
    
    Checking exact answer can just be `CheckAnswer(0 to 20: _*)`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by HeartSaVioR <gi...@git.apache.org>.
Github user HeartSaVioR commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Looks like the patch is outdated, and when continuous query supports shuffled stateful operators, implementing task level retry is not that trivial. To get correct result of aggregation, when one of task fails at epoch N, all the tasks and states should be restored to epoch N. 
    
    I definitely agree that it would be ideal to have stable task level retry, just wondering this patch would work with follow-up tasks for continuous mode.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Now that I think about it, we may eventually need a way to set the starting partition offset after creation for other reasons, so I'm less confident in those second and third reasons. But on the whole I still think converting to global restarts makes sense.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking closed the pull request at:

    https://github.com/apache/spark/pull/20675


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    cc @tdas and @jose-torres 
    #20225 gives a quickly fix for task level retry, this is just an attempt for a maybe better implementation. Please let me know if I do something wrong or have misunderstandings of Continuous Processing. Thanks :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    **[Test build #87666 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87666/testReport)** for PR 20675 at commit [`21f574e`](https://github.com/apache/spark/commit/21f574e2a3ad3c8e68b92776d2a141d7fcb90502).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r171011587
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala ---
    @@ -219,18 +219,59 @@ class ContinuousSuite extends ContinuousSuiteBase {
         spark.sparkContext.addSparkListener(listener)
         try {
           testStream(df, useV2Sink = true)(
    -        StartStream(Trigger.Continuous(100)),
    +        StartStream(longContinuousTrigger),
    +        AwaitEpoch(0),
             Execute(waitForRateSourceTriggers(_, 2)),
    +        IncrementEpoch(),
             Execute { _ =>
               // Wait until a task is started, then kill its first attempt.
               eventually(timeout(streamingTimeout)) {
                 assert(taskId != -1)
               }
               spark.sparkContext.killTaskAttempt(taskId)
             },
    -        ExpectFailure[SparkException] { e =>
    -          e.getCause != null && e.getCause.getCause.isInstanceOf[ContinuousTaskRetryException]
    -        })
    +        Execute(waitForRateSourceTriggers(_, 4)),
    +        IncrementEpoch(),
    +        // Check the answer exactly, if there's duplicated result, CheckAnserRowsContains
    +        // will also return true.
    +        CheckAnswerRowsContainsOnlyOnce(scala.Range(0, 20).map(Row(_))),
    --- End diff --
    
    Ah, right, my bad.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87666/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1054/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20675: [SPARK-23033][SS][Follow Up] Task level retry for...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20675#discussion_r171161352
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/reader/streaming/ContinuousDataReader.java ---
    @@ -33,4 +33,16 @@
          * as a restart checkpoint.
          */
         PartitionOffset getOffset();
    +
    +    /**
    +     * Set the start offset for the current record, only used in task retry. If setOffset keep
    +     * default implementation, it means current ContinuousDataReader can't support task level retry.
    +     *
    +     * @param offset last offset before task retry.
    +     */
    +    default void setOffset(PartitionOffset offset) {
    --- End diff --
    
    Cool, that's more clearer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    **[Test build #87666 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87666/testReport)** for PR 20675 at commit [`21f574e`](https://github.com/apache/spark/commit/21f574e2a3ad3c8e68b92776d2a141d7fcb90502).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    Great thanks for your detailed reply!
    > The semantics aren't quite right. Task-level retry can happen a fixed number of times for the lifetime of the task, which is the lifetime of the query - even if it runs for days after, the attempt number will never be reset.
    - I think the attempt number never be reset is not a problem, as long as the task start with right epoch and offset. Maybe I don't understand the meaning of the semantics, could you please give more explain?
    - As far as I'm concerned, while we have a larger parallel number, whole stage restart is a too heavy operation and will lead a data shaking.
    - Also want to leave a further thinking, after CP support shuffle and more complex scenario, task level retry need more work to do in order to ensure data is correct. But it maybe still a useful feature? I just want to leave this patch and initiate a discussion about this :)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20675: [SPARK-23033][SS][Follow Up] Task level retry for contin...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/20675
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org