You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/18 04:15:55 UTC
[GitHub] [spark] uncleGen opened a new pull request #26156: Failed to get
state store when task number is not determinate
uncleGen opened a new pull request #26156: Failed to get state store when task number is not determinate
URL: https://github.com/apache/spark/pull/26156
### What changes were proposed in this pull request?
Add a new optimizer rule to add hint for join for those queries containing stream-stream-join.
### Why are the changes needed?
This is a correctness bug.
Now, Spark use the `TaskPartitionId` to determine the StateStore path.
```
TaskPartitionId \
StateStoreVersion --> StoreProviderId -> StateStore
StateStoreName /
```
In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened. Following is a sample pseudocode:
```
val df3 = streamDf1.join(streamDf2)
val df5 = streamDf3.join(batchDf4)
val df = df3.union(df5)
df.writeStream...start()
```
A simplified DAG like this:
```
DataSourceV2Scan Scan Relation DataSourceV2Scan DataSourceV2Scan
(streamDf3) | (streamDf1) (streamDf2)
| | | |
Exchange(200) Exchange(200) Exchange(200) Exchange(200)
| | | |
Sort Sort | |
\ / \ /
\ / \ /
SortMergeJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
```
Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file.
```
LocalTableScan Scan Relation DataSourceV2Scan DataSourceV2Scan
| | | |
BroadcastExchange | Exchange(200) Exchange(200)
| | | |
| | | |
\ / \ /
\ / \ /
BroadcastHashJoin StreamingSymmetricHashJoin
\ /
\ /
\ /
Union
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
One new UT were added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org