You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by mukulmurthy <gi...@git.apache.org> on 2018/09/10 23:57:11 UTC
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
GitHub user mukulmurthy opened a pull request:
https://github.com/apache/spark/pull/22386
[SPARK-25399] Continuous processing state should not affect microbatch execution jobs
## What changes were proposed in this pull request?
The leftover state from running a continuous processing streaming job should not affect later microbatch execution jobs. If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment, the microbatch job could get wrong answers because it can attempt to load the wrong version of the state.
## How was this patch tested?
New and existing unit tests
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/mukulmurthy/oss-spark 25399-streamthread
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/22386.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #22386
----
commit f91f365a34db55834f2f5f2eec65ece0eacfd29c
Author: Mukul Murthy <mu...@...>
Date: 2018-09-10T21:40:46Z
Set a task context property to indicate we are using continuous processing
commit 2b86b2f79434585c9e2974f37c1d97b5830f0ef5
Author: Mukul Murthy <mu...@...>
Date: 2018-09-10T21:58:17Z
Use property to decide which version instaed of reading ThreadLocal variable
commit 0cf07515eebf609a0aec0c0b7ede934cc4465e21
Author: Mukul Murthy <mu...@...>
Date: 2018-09-10T23:51:18Z
Set is continuous processing property to false for microbatch
commit c2f813bb46bd08ee808ef35ad9569fb9dc7194a6
Author: Mukul Murthy <mu...@...>
Date: 2018-09-10T23:51:32Z
tests
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95961 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95961/testReport)** for PR 22386 at commit [`4d4beef`](https://github.com/apache/spark/commit/4d4beef95e18d743e4b8f8bec71b7a6229cc58d3).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:
https://github.com/apache/spark/pull/22386
Great thanks for your comment and fix @mukulmurthy! We'll also port this soon.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216723249
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
false))
}
+ test("is_continuous_processing property should be false for microbatch processing") {
+ val input = MemoryStream[Int]
+ val df = input.toDS()
+ .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
+ testStream(df) (
+ AddData(input, 1),
+ CheckAnswer("false")
+ )
+ }
+
+ test("is_continuous_processing property should be true for continuous processing") {
+ val input = ContinuousMemoryStream[Int]
+ var x: String = ""
--- End diff --
Non used var?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95961 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95961/testReport)** for PR 22386 at commit [`4d4beef`](https://github.com/apache/spark/commit/4d4beef95e18d743e4b8f8bec71b7a6229cc58d3).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the issue:
https://github.com/apache/spark/pull/22386
LGTM. Just one super nit.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216750746
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
@@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
// If we're in continuous processing mode, we should get the store version for the current
// epoch rather than the one at planning time.
- val currentVersion = EpochTracker.getCurrentEpoch match {
- case None => storeVersion
- case Some(value) => value
+ val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
--- End diff --
I think I'd rather keep it as is to be more resilient for the future.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Merged build finished. Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216730172
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
@@ -74,9 +74,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
// If we're in continuous processing mode, we should get the store version for the current
// epoch rather than the one at planning time.
- val currentVersion = EpochTracker.getCurrentEpoch match {
- case None => storeVersion
- case Some(value) => value
+ val isContinuous = Option(ctxt.getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
--- End diff --
Just simple `toBoolean` here is OK?Cause you set default value both MicroBatch and Continuous side.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95961/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216781599
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala ---
@@ -74,9 +75,14 @@ class StateStoreRDD[T: ClassTag, U: ClassTag](
// If we're in continuous processing mode, we should get the store version for the current
// epoch rather than the one at planning time.
- val currentVersion = EpochTracker.getCurrentEpoch match {
- case None => storeVersion
- case Some(value) => value
+ val isContinuous = Option(ctxt.getLocalProperty(StreamExecution.IS_CONTINUOUS_PROCESSING))
+ .map(_.toBoolean)
+ val currentVersion = if (isContinuous.contains(true)) {
--- End diff --
super nit: this looks weird. rather i would do change the previous line `val isContinuous = ... .map(_.toBoolean).getOrElse(false)`
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95959 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95959/testReport)** for PR 22386 at commit [`3ebbed3`](https://github.com/apache/spark/commit/3ebbed3b5ed09638cf5f2b4e31ff28ede7bf9e73).
* This patch **fails Spark unit tests**.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95959 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95959/testReport)** for PR 22386 at commit [`3ebbed3`](https://github.com/apache/spark/commit/3ebbed3b5ed09638cf5f2b4e31ff28ede7bf9e73).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on the issue:
https://github.com/apache/spark/pull/22386
```
If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment
```
Little confuse about this scenario, could you explain more?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Merged build finished. Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216728848
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---
@@ -391,6 +393,7 @@ class ContinuousExecution(
}
object ContinuousExecution {
+ val IS_CONTINUOUS_PROCESSING = "__is_continuous_processing"
--- End diff --
nit: I think this belongs in StreamExecution, since both ContinuousExecution and MicroBatchExecution set it.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95908 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95908/testReport)** for PR 22386 at commit [`c2f813b`](https://github.com/apache/spark/commit/c2f813bb46bd08ee808ef35ad9569fb9dc7194a6).
* This patch passes all tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399][SS] Continuous processing state should not...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95959/
Test FAILed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Can one of the admins verify this patch?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399][SS] Continuous processing state sho...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/22386
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:
https://github.com/apache/spark/pull/22386
**[Test build #95908 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95908/testReport)** for PR 22386 at commit [`c2f813b`](https://github.com/apache/spark/commit/c2f813bb46bd08ee808ef35ad9569fb9dc7194a6).
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request #22386: [SPARK-25399] Continuous processing state should ...
Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:
https://github.com/apache/spark/pull/22386#discussion_r216723459
--- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala ---
@@ -1029,6 +1030,35 @@ class StreamSuite extends StreamTest {
false))
}
+ test("is_continuous_processing property should be false for microbatch processing") {
+ val input = MemoryStream[Int]
+ val df = input.toDS()
+ .map(i => TaskContext.get().getLocalProperty(ContinuousExecution.IS_CONTINUOUS_PROCESSING))
+ testStream(df) (
+ AddData(input, 1),
+ CheckAnswer("false")
+ )
+ }
+
+ test("is_continuous_processing property should be true for continuous processing") {
+ val input = ContinuousMemoryStream[Int]
+ var x: String = ""
--- End diff --
unused?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:
https://github.com/apache/spark/pull/22386
> ```
> If a continuous processing job runs and the same thread gets reused for a microbatch execution job in the same environment
> ```
> Little confuse about this scenario, could you explain more? I mean its only happened in UT or we may meet this on product env?
@xuanyuanking It theoretically could have been encountered in production, but continuous processing is considered an experimental feature. The only way to encounter it in production is to run a continuous processing stream and then a microbatch stream in the same spark cluster and have an execution thread get reused. The bug is in StateStoreRDD; EpochTracker sets a ThreadLocal variable called currentEpoch and StateStoreRDD checks for the existence of this variable to decide if the current streaming job is continuous or microbatch.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:
https://github.com/apache/spark/pull/22386
Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95908/
Test PASSed.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark issue #22386: [SPARK-25399] Continuous processing state should not aff...
Posted by mukulmurthy <gi...@git.apache.org>.
Github user mukulmurthy commented on the issue:
https://github.com/apache/spark/pull/22386
@tdas and @jose-torres for review
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org