You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mukulmurthy <gi...@git.apache.org> on 2018/09/10 23:57:11 UTC

[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

GitHub user mukulmurthy opened a pull request:

    https://github.com/apache/spark/pull/22386

    [SPARK-25399] Continuous processing state should not affect microbatch execution jobs

    ## What changes were proposed in this pull request?
    
    The leftover state from running a continuous processing streaming job should not affect later microbatch execution jobs. If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment, the microbatch job could get wrong answers because it can attempt to load the wrong version of the state.
    
    ## How was this patch tested?
    
    New and existing unit tests

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mukulmurthy/oss-spark 25399-streamthread

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22386
    
----
commit f91f365a34db55834f2f5f2eec65ece0eacfd29c
Author: Mukul Murthy <mu...@...>
Date:   2018-09-10T21:40:46Z

    Set a task context property to indicate we are using continuous processing

commit 2b86b2f79434585c9e2974f37c1d97b5830f0ef5
Author: Mukul Murthy <mu...@...>
Date:   2018-09-10T21:58:17Z

    Use property to decide which version instaed of reading ThreadLocal variable

commit 0cf07515eebf609a0aec0c0b7ede934cc4465e21
Author: Mukul Murthy <mu...@...>
Date:   2018-09-10T23:51:18Z

    Set is continuous processing property to false for microbatch

commit c2f813bb46bd08ee808ef35ad9569fb9dc7194a6
Author: Mukul Murthy <mu...@...>
Date:   2018-09-10T23:51:32Z

    tests

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95961/testReport)** for PR 22386 at commit [`4d4beef`](https://github.com/apache/spark/commit/4d4beef95e18d743e4b8f8bec71b7a6229cc58d3).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Great thanks for your comment and fix @mukulmurthy! We'll also port this soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216723249
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
             false))
       }
     
    +  test("is_continuous_processing property should be false for microbatch processing") {
    +    val input = MemoryStream[Int]
    +    val df = input.toDS()
    +      .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
    +    testStream(df) (
    +      AddData(input, 1),
    +      CheckAnswer("false")
    +    )
    +  }
    +
    +  test("is_continuous_processing property should be true for continuous processing") {
    +    val input = ContinuousMemoryStream[Int]
    +    var x: String = ""
    --- End diff --
    
    Non used var?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95961/testReport)** for PR 22386 at commit [`4d4beef`](https://github.com/apache/spark/commit/4d4beef95e18d743e4b8f8bec71b7a6229cc58d3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    LGTM. Just one super nit. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216750746
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
    @@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     
         // If we're in continuous processing mode, we should get the store version for the current
         // epoch rather than the one at planning time.
    -    val currentVersion = EpochTracker.getCurrentEpoch match {
    -      case None => storeVersion
    -      case Some(value) => value
    +    val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
    --- End diff --
    
    I think I'd rather keep it as is to be more resilient for the future.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216730172
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
    @@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     
         // If we're in continuous processing mode, we should get the store version for the current
         // epoch rather than the one at planning time.
    -    val currentVersion = EpochTracker.getCurrentEpoch match {
    -      case None => storeVersion
    -      case Some(value) => value
    +    val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
    --- End diff --
    
    Just simple `toBoolean` here is OK?Cause you set default value both MicroBatch and Continuous side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95961/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216781599
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
    @@ -74,9 +75,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
     
         // If we're in continuous processing mode, we should get the store version for the current
         // epoch rather than the one at planning time.
    -    val currentVersion = EpochTracker.getCurrentEpoch match {
    -      case None => storeVersion
    -      case Some(value) => value
    +    val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
    +      .map(_.toBoolean)
    +    val currentVersion = if (isContinuous.contains(true)) {
    --- End diff --
    
    super nit: this looks weird. rather i would do change the previous line `val isContinuous = ... .map(_.toBoolean).getOrElse(false)`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95959/testReport)** for PR 22386 at commit [`3ebbed3`](https://github.com/apache/spark/commit/3ebbed3b5ed09638cf5f2b4e31ff28ede7bf9e73).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95959/testReport)** for PR 22386 at commit [`3ebbed3`](https://github.com/apache/spark/commit/3ebbed3b5ed09638cf5f2b4e31ff28ede7bf9e73).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    ```
    If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment
    ```
    Little confuse about this scenario, could you explain more?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216728848
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
    @@ -391,6 +393,7 @@ class ContinuousExecution(
     }
     
     object ContinuousExecution {
    +  val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
    --- End diff --
    
    nit: I think this belongs in StreamExecution, since both ContinuousExecution and MicroBatchExecution set it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95908 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95908/testReport)** for PR 22386 at commit [`c2f813b`](https://github.com/apache/spark/commit/c2f813bb46bd08ee808ef35ad9569fb9dc7194a6).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95959/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399][SS] Continuous processing state sho...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22386


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    **[Test build #95908 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95908/testReport)** for PR 22386 at commit [`c2f813b`](https://github.com/apache/spark/commit/c2f813bb46bd08ee808ef35ad9569fb9dc7194a6).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22386#discussion_r216723459
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
    @@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
             false))
       }
     
    +  test("is_continuous_processing property should be false for microbatch processing") {
    +    val input = MemoryStream[Int]
    +    val df = input.toDS()
    +      .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
    +    testStream(df) (
    +      AddData(input, 1),
    +      CheckAnswer("false")
    +    )
    +  }
    +
    +  test("is_continuous_processing property should be true for continuous processing") {
    +    val input = ContinuousMemoryStream[Int]
    +    var x: String = ""
    --- End diff --
    
    unused?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    > ```
    > If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment
    > ```
    > Little confuse about this scenario, could you explain more? I mean its only happened in UT or we may meet this on product env?
    
    @xuanyuanking It theoretically could have been encountered in production, but continuous processing is considered an experimental feature. The only way to encounter it in production is to run a continuous processing stream and then a microbatch stream in the same spark cluster and have an execution thread get reused. The bug is in StateStoreRDD; EpochTracker sets a ThreadLocal variable called currentEpoch and StateStoreRDD checks for the existence of this variable to decide if the current streaming job is continuous or microbatch.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95908/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...

Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:

    https://github.com/apache/spark/pull/22386
  
    @tdas and @jose-torres for review


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org