You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Leanken.Lin (Jira)" <ji...@apache.org> on 2020/08/18 02:24:00 UTC

[jira] [Updated] (SPARK-32644) NAAJ support for ShuffleHashJoin when AQE is on

     [ https://issues.apache.org/jira/browse/SPARK-32644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Leanken.Lin updated SPARK-32644:
--------------------------------
    Description: 
In SPARK-32290, we managed to optimize NAAJ scenario from BNLJ to BHJ, but skipped the checking the BuildSide plan with *spark.sql.autoBroadcastJoinThreshold* parameter, which means in very bad case, BuildSide Plan being too big might cause Driver OOM. So the NAAJ support for  ShuffledHashJoin is important as well.

 

The support of SHJ for NAAJ has some difficulties in NullKey scenario, as for normal HashedRelation and EmtpyHashedRelation, the code logical should be the same when it comes to BHJ and SHJ, but if NullKey exists in global BuildSide data, and only one partition could be built into EmptyHashedRelationWithAllNullKeys, and this partition was not able to do *fast stop* for the entire RDD. So after offline talked with some committer and discussion, decided to support NAAJ for SHJ when AQE is on, because when AQE is on, Shuffle will be pre-executed, and we should be able to know that whether the BuildSide contains NullKey or not before the actual JOIN executed.

 

Basically, In NAAJ SHJ Implementation, we collected information whether BuildSide is Empty or contains NullKey, and keep these information in ShuffleExchangeExec Metrics, and during AQE, we rewritten these two case into LocalTableScan and StreamedSidePlan to Avoid NAAJ, as for the normal relation, we processed it in Distributed style.

 

  was:
In [SPARK-32290|https://issues.apache.org/jira/browse/SPARK-32290], we managed to optimize NAAJ scenario from BNLJ to BHJ, but skipped the checking the BuildSide plan with *spark.sql.autoBroadcastJoinThreshold* parameter, which means in very bad case, BuildSide Plan being to big might cause Driver OOM. So the NAAJ support for  ShuffledHashJoin is important as well.

 

The support of SHJ for NAAJ has some difficulties in NullKey scenario, as for normal HashedRelation and EmtpyHashedRelation, the code logical should be the same when it comes to BHJ and SHJ, but if NullKey exists in global BuildSide data, and only one partition could be built into EmptyHashedRelationWithAllNullKeys, and this partition was not able to do *fast stop* for the entire RDD. So after offline talked with some committer and discussion, decided to support NAAJ for SHJ when AQE is on, because when AQE is on, Shuffle will be pre-executed, and we should be able to know that whether the BuildSide contains NullKey or not before the actual JOIN executed.

 

Basically, In NAAJ SHJ Implementation, we collected information whether BuildSide is Empty or contains NullKey, and keep these information in ShuffleExchangeExec Metrics, and during AQE, we rewritten these two case into LocalTableScan and StreamedSidePlan to Avoid NAAJ, as for the normal relation, we processed it in Distributed style.

 


> NAAJ support for ShuffleHashJoin when AQE is on
> -----------------------------------------------
>
>                 Key: SPARK-32644
>                 URL: https://issues.apache.org/jira/browse/SPARK-32644
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Leanken.Lin
>            Priority: Major
>
> In SPARK-32290, we managed to optimize NAAJ scenario from BNLJ to BHJ, but skipped the checking the BuildSide plan with *spark.sql.autoBroadcastJoinThreshold* parameter, which means in very bad case, BuildSide Plan being too big might cause Driver OOM. So the NAAJ support for  ShuffledHashJoin is important as well.
>  
> The support of SHJ for NAAJ has some difficulties in NullKey scenario, as for normal HashedRelation and EmtpyHashedRelation, the code logical should be the same when it comes to BHJ and SHJ, but if NullKey exists in global BuildSide data, and only one partition could be built into EmptyHashedRelationWithAllNullKeys, and this partition was not able to do *fast stop* for the entire RDD. So after offline talked with some committer and discussion, decided to support NAAJ for SHJ when AQE is on, because when AQE is on, Shuffle will be pre-executed, and we should be able to know that whether the BuildSide contains NullKey or not before the actual JOIN executed.
>  
> Basically, In NAAJ SHJ Implementation, we collected information whether BuildSide is Empty or contains NullKey, and keep these information in ShuffleExchangeExec Metrics, and during AQE, we rewritten these two case into LocalTableScan and StreamedSidePlan to Avoid NAAJ, as for the normal relation, we processed it in Distributed style.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org