You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/18 04:15:55 UTC

[GitHub] [spark] uncleGen opened a new pull request #26156: Failed to get state store when task number is not determinate

uncleGen opened a new pull request #26156: Failed to get state store when task number is not determinate
URL: https://github.com/apache/spark/pull/26156
 
 
   ### What changes were proposed in this pull request?
   
   Add a new optimizer rule to add hint for join for those queries containing stream-stream-join.
   
   ### Why are the changes needed?
   
   This is a correctness bug. 
   Now, Spark use the `TaskPartitionId` to determine the StateStore path.
   
   ```
   TaskPartitionId   \ 
   StateStoreVersion  --> StoreProviderId -> StateStore
   StateStoreName    /  
   ```
   In spark stages, the task partition id is determined by the number of tasks. As we said the StateStore file path depends on the task partition id. So if stream-stream join task partition id is changed against last batch, it will get wrong StateStore data or fail with non-exist StateStore data. In some corner cases, it happened. Following is a sample pseudocode:
   
   ```
   val df3 = streamDf1.join(streamDf2)
   val df5 = streamDf3.join(batchDf4)
   val df = df3.union(df5)
   df.writeStream...start()
   ```
   A simplified DAG like this:
   
   ```
   DataSourceV2Scan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
    (streamDf3)            |               (streamDf1)        (streamDf2)
        |                  |                   |                 |
     Exchange(200)      Exchange(200)       Exchange(200)     Exchange(200)
        |                  |                   |                 | 
      Sort                Sort                 |                 |
        \                  /                   \                 /
         \                /                     \               /
           SortMergeJoin                    StreamingSymmetricHashJoin
                        \                 /
                          \             /
                            \         /
                               Union
   ```
   Stream-Steam join task Id will start from 200 to 399 as they are in the same stage with `SortMergeJoin`. But when there is no new incoming data in `streamDf3` in some batch, it will generate a empty LocalRelation, and then the SortMergeJoin will be replaced with a BroadcastHashJoin. In this case, Stream-Steam join task Id will start from 1 to 200. Finally, it will get wrong StateStore path through TaskPartitionId, and failed with error reading state store delta file.
   
   ```
   LocalTableScan   Scan Relation     DataSourceV2Scan   DataSourceV2Scan
        |                  |                   |                 |
   BroadcastExchange       |              Exchange(200)     Exchange(200)
        |                  |                   |                 | 
        |                  |                   |                 |
         \                /                     \               /
          \             /                        \             /
         BroadcastHashJoin                 StreamingSymmetricHashJoin
                        \                 /
                          \             /
                            \         /
                               Union
   ```
   
   ### Does this PR introduce any user-facing change?
   
   No
   
   ### How was this patch tested?
    
   One new UT were added.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org