You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by efimpoberezkin <gi...@git.apache.org> on 2018/05/22 11:10:39 UTC

[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

GitHub user efimpoberezkin opened a pull request:

    https://github.com/apache/spark/pull/21392

    [SPARK-24063][SS] Control maximum epoch backlog for ContinuousExecution

    ## What changes were proposed in this pull request?
    
    This pull request adds maxEpochBacklog SQL configuration option. EpochCoordinator tracks if the length of the queue of waiting epochs has exceeded it. If so, stream is stopped with an error indicating too many epochs stacked up
    
    ## How was this patch tested?
    
    Existing unit tests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/efimpoberezkin/spark pr/control-epoch-backlog

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21392.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21392
    
----
commit 1d2fc298284f6d553d78035f3095e5d2abe2a8a8
Author: Efim Poberezkin <ef...@...>
Date:   2018-04-25T13:01:29Z

    Add max epoch backlog option to SQLConf

commit 0919b3f7542aa0a807b0ac56e0da1366f347bb54
Author: Efim Poberezkin <ef...@...>
Date:   2018-05-07T10:11:21Z

    Replace logging an error with throwing an exception

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by yanlin-Lynn <gi...@git.apache.org>.
Github user yanlin-Lynn commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190109933
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
           // If not, add the epoch being currently processed to epochs waiting to be committed,
           // otherwise commit it.
           if (lastCommittedEpoch != epoch - 1) {
    -        logDebug(s"Epoch $epoch has received commits from all partitions " +
    -          s"and is waiting for epoch ${epoch - 1} to be committed first.")
    -        epochsWaitingToBeCommitted.add(epoch)
    +        if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
    +          maxEpochBacklogExceeded = true
    +        } else {
    +          logDebug(s"Epoch $epoch has received commits from all partitions " +
    +            s"and is waiting for epoch ${epoch - 1} to be committed first.")
    +          epochsWaitingToBeCommitted.add(epoch)
    --- End diff --
    
    once maxEpochBacklogExceeded is set to true, can never set to false again?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Hi all, any update on this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    In that case, would you mind if I ask to leave this closed for now and reopen when you start to work on this? I am trying to leave active PRs only.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190160821
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
      */
     private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage
     
    +/**
    + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
    + */
    +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
    --- End diff --
    
    Do you mean make the query fail right from EpochCoordinator? If yes, I wanted to do so, but didn't figure out how to terminate query with exception.
    EpochCoordinator has query: ContinuousExecution as a parameter, but then I don't see a suitable method for query.. Closest is stop() I guess.
    Or am I looking in a completely wrong direction? Please give a hint


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    @HyukjinKwon Hi, I've stopped working on it for some time now


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190316039
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
      */
     private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage
     
    +/**
    + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
    + */
    +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
    --- End diff --
    
    I think we'd probably want to add some method like private[streaming] stopWithException(e) to ContinuousExecution.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    @jose-torres Hi Jose, could you take a look at this pr please? I had doubts how to properly implement error reporting logic we were discussing and this is what I came up with.
    Also please advise how to test these changes. I was writing this several weeks ago so if my memory doesn't fail me I thought about the approach similar to tests in ContinuousSuite with custom StreamActions, but wasn't completely sure.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190158513
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -153,9 +161,13 @@ private[continuous] class EpochCoordinator(
           // If not, add the epoch being currently processed to epochs waiting to be committed,
           // otherwise commit it.
           if (lastCommittedEpoch != epoch - 1) {
    -        logDebug(s"Epoch $epoch has received commits from all partitions " +
    -          s"and is waiting for epoch ${epoch - 1} to be committed first.")
    -        epochsWaitingToBeCommitted.add(epoch)
    +        if (epochsWaitingToBeCommitted.size == maxEpochBacklog) {
    +          maxEpochBacklogExceeded = true
    +        } else {
    +          logDebug(s"Epoch $epoch has received commits from all partitions " +
    +            s"and is waiting for epoch ${epoch - 1} to be committed first.")
    +          epochsWaitingToBeCommitted.add(epoch)
    --- End diff --
    
    Basing on what I discussed with Jose the stream should be killed if backlog exceeds value of a certain config option, so yes, why set it back to false later. At least that's how I see it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190112873
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -233,9 +235,15 @@ class ContinuousExecution(
                   }
                   false
                 } else if (isActive) {
    -              currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
    -              logInfo(s"New epoch $currentBatchId is starting.")
    -              true
    +              val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
    +              if (maxBacklogExceeded) {
    +                throw new IllegalStateException(
    +                  "Size of the epochs queue has exceeded maximum allowed epoch backlog.")
    --- End diff --
    
    Agreed that the code as written won't shut down the stream. But I think it does make sense to kill the stream rather than waiting for old epochs. If we end up with a large backlog it's almost surely because some partition isn't making any progress, so I wouldn't expect Spark to ever be able to catch up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin closed the pull request at:

    https://github.com/apache/spark/pull/21392


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190112179
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
      */
     private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage
     
    +/**
    + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
    + */
    +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
    --- End diff --
    
    I'm not sure we need to make a side-channel in the RPC handler for this. I'd try to just make the query fail when the condition is reached in the first place.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by yanlin-Lynn <gi...@git.apache.org>.
Github user yanlin-Lynn commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190108352
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -233,9 +235,15 @@ class ContinuousExecution(
                   }
                   false
                 } else if (isActive) {
    -              currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
    -              logInfo(s"New epoch $currentBatchId is starting.")
    -              true
    +              val maxBacklogExceeded = epochEndpoint.askSync[Boolean](CheckIfMaxBacklogIsExceeded)
    +              if (maxBacklogExceeded) {
    +                throw new IllegalStateException(
    +                  "Size of the epochs queue has exceeded maximum allowed epoch backlog.")
    --- End diff --
    
    Throw exception will cause application to fail.
    I think it's better to block and wait old epoch to be committed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21392: [SPARK-24063][SS] Control maximum epoch backlog f...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21392#discussion_r190473131
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -45,6 +45,11 @@ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage
      */
     private[sql] case object StopContinuousExecutionWrites extends EpochCoordinatorMessage
     
    +/**
    + * Returns boolean indicating if size of the epochs queue has exceeded maximum epoch backlog.
    + */
    +private[sql] case object CheckIfMaxBacklogIsExceeded extends EpochCoordinatorMessage
    --- End diff --
    
    Okay, thought about something like this but wasn't sure if it's fine to do so for the sake of this change. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by efimpoberezkin <gi...@git.apache.org>.
Github user efimpoberezkin commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Sure, no problem.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21392: [SPARK-24063][SS] Control maximum epoch backlog for Cont...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21392
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org