You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@calcite.apache.org by "Jing Zhang (Jira)" <ji...@apache.org> on 2021/12/22 03:35:00 UTC

[jira] [Updated] (CALCITE-4957) Add new RelDistribution.Type to solve data skew caused by norma Hash Distribution type

     [ https://issues.apache.org/jira/browse/CALCITE-4957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jing Zhang updated CALCITE-4957:
--------------------------------
    Description: 
I hope to extend `RelDistribution` to support more distribution types in order to solve data skew in the normal hash distribution.

When we use hash distribution to bring all records with the same hash key to the same place, the job performance would be poor if there exists hot keys. 
There is a solution to solve this problem, we could send a hot key to one of serval downstream tasks, chosen at random.
In HashJoin, we could use random hash partition in one side, for the other input to the join, records relating to the hot key need to be replicated to all downstream tasks handling that key.
In HashAggregate, we could split the aggregate into partial-final if all the aggregation functions support splitting.
The 10th chapter in the book "Designing Data Intensive Applications" also refers this solution to solve data skew.

Anyway, we should first extend `RelDistribution` to support more distribution types, for example, hash random type. 

For example, there is a [lookup join|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join] in Flink. is typically used to enrich a table with data that is queried from an external system. 
For the following query

{code:sql}
select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day 
from default_catalog.default_database.probe as p
join partition_table_3 for system_time as of p.proc_time as b
on p.x=b.x and p.y=b.y
{code}


When use normal hash distribution.
The logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash[x, y]])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}


If enable data_skew solution in hint, the logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}



  was:
I hope to extend `RelDistribution` to support more distribution types in order to solve data skew in the normal hash distribution.

When we use hash distribution to bring all records with the same hash key to the same place, the job performance would be poor if there exists hot keys. 
There is a solution to solve this problem, we could send a hot key to one of serval downstream tasks, chosen at random.
In HashJoin, we could use random hash partition in one side, for the other input to the join, records relating to the hot key need to be replicated to all downstream tasks handling that key.
In HashAggregate, we could split the aggregate into partial-final if all the aggregation functions support splitting.
The 10th chapter in the book "Designing Data Intensive Applications" also refers this solution to solve data skew.

Anyway, we should first extend `RelDistribution` to support more distribution types, for example, hash random type. 

For example, there is a lookup join in Flink. is typically used to enrich a table with data that is queried from an external system. 
For the following query

{code:sql}
select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day 
from default_catalog.default_database.probe as p
join partition_table_3 for system_time as of p.proc_time as b
on p.x=b.x and p.y=b.y
{code}


When use normal hash distribution.
The logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash[x, y]])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}


If enable data_skew solution in hint, the logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}




> Add new RelDistribution.Type to solve data skew caused by norma Hash Distribution type
> --------------------------------------------------------------------------------------
>
>                 Key: CALCITE-4957
>                 URL: https://issues.apache.org/jira/browse/CALCITE-4957
>             Project: Calcite
>          Issue Type: Improvement
>          Components: core
>            Reporter: Jing Zhang
>            Priority: Major
>
> I hope to extend `RelDistribution` to support more distribution types in order to solve data skew in the normal hash distribution.
> When we use hash distribution to bring all records with the same hash key to the same place, the job performance would be poor if there exists hot keys. 
> There is a solution to solve this problem, we could send a hot key to one of serval downstream tasks, chosen at random.
> In HashJoin, we could use random hash partition in one side, for the other input to the join, records relating to the hot key need to be replicated to all downstream tasks handling that key.
> In HashAggregate, we could split the aggregate into partial-final if all the aggregation functions support splitting.
> The 10th chapter in the book "Designing Data Intensive Applications" also refers this solution to solve data skew.
> Anyway, we should first extend `RelDistribution` to support more distribution types, for example, hash random type. 
> For example, there is a [lookup join|https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/joins/#lookup-join] in Flink. is typically used to enrich a table with data that is queried from an external system. 
> For the following query
> {code:sql}
> select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day 
> from default_catalog.default_database.probe as p
> join partition_table_3 for system_time as of p.proc_time as b
> on p.x=b.x and p.y=b.y
> {code}
> When use normal hash distribution.
> The logical plan is as following,
> {code:java}
>    +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
>       +- Exchange(distribution=[hash[x, y]])
>          +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> {code}
> If enable data_skew solution in hint, the logical plan is as following,
> {code:java}
>    +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
>       +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
>          +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
> {code}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)