You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/10/14 07:26:02 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #26108: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows

HeartSaVioR opened a new pull request #26108: [SPARK-26154][SS] Streaming left/right outer join should not return outer nulls for already matched rows
URL: https://github.com/apache/spark/pull/26108
 
 
   ### What changes were proposed in this pull request?
   
   This patch fixes the edge case of streaming left/right outer join described below:
   
   Suppose query is provided as
   
   `select * from A join B on A.id = B.id AND (A.ts <= B.ts AND B.ts <= A.ts + interval 5 seconds)`
   
   and there're two rows for L1 (from A) and R1 (from B) which ensures L1.id = R1.id and L1.ts = R1.ts.
   (we can simply imagine it from self-join)
   
   Then Spark processes L1 and R1 as below:
   
   - row L1 and row R1 are joined at batch 1
   - row R1 is evicted at batch 2 due to join and watermark condition, whereas row L1 is not evicted
   - row L1 is evicted at batch 3 due to join and watermark condition
   
   When determining outer rows to match with null, Spark applies some assumption commented in codebase, as below:
   
   ```
   Checking whether the current row matches a key in the right side state, and that key	
   has any value which satisfies the filter function when joined. If it doesn't,	
   we know we can join with null, since there was never (including this batch) a match	
   within the watermark period. If it does, there must have been a match at some point, so	
   we know we can't join with null.
   ```
   
   But as explained the edge-case earlier, the assumption is not correct. As we don't have any good assumption to optimize which doesn't have edge-case, we have to track whether such row is matched with others before, and match with null row only when the row is not matched.
   
   To track the matching of row, the patch adds a new state to streaming join state manager, and mark whether the row is matched to others or not. We leverage the information when dealing with eviction of rows which would be candidates to match with null rows.
   
   This approach introduces new state format which is not compatible with old state format - queries with old state format will be still running but they will still have the issue and be required to discard checkpoint and rerun to take this patch in effect.
   
   ### Why are the changes needed?
   
   This patch fixes a correctness issue.
   
   ### Does this PR introduce any user-facing change?
   
   No for compatibility viewpoint, but we'll encourage end users to discard the old checkpoint and rerun the query if they run stream-stream outer join query with old checkpoint, which might be "yes" for the question.
   
   ### How was this patch tested?
   
   Added UT which fails on current Spark and passes with this patch. Also passed existing streaming join UTs.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org