You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@calcite.apache.org by "Jing Zhang (Jira)" <ji...@apache.org> on 2021/12/22 03:02:00 UTC

[jira] [Created] (CALCITE-4957) Add new RelDistribution.Type to solve data skew caused by norma Hash Distribution type

Jing Zhang created CALCITE-4957:
-----------------------------------

             Summary: Add new RelDistribution.Type to solve data skew caused by norma Hash Distribution type
                 Key: CALCITE-4957
                 URL: https://issues.apache.org/jira/browse/CALCITE-4957
             Project: Calcite
          Issue Type: Improvement
          Components: core
            Reporter: Jing Zhang


I hope to extend `RelDistribution` to support more distribution types in order to solve data skew in the normal hash distribution.

When we use hash distribution to bring all records with the same hash key to the same place, the job performance would be poor if there exists hot keys. 
There is a solution to solve this problem, we could send a hot key to one of serval downstream tasks, chosen at random.
In HashJoin, we could use random hash partition in one side, for the other input to the join, records relating to the hot key need to be replicated to all downstream tasks handling that key.
In HashAggregate, we could split the aggregate into partial-final if all the aggregation functions support splitting.
The 10th chapter in the book "Designing Data Intensive Applications" also refers this solution to solve data skew.

Anyway, we should first extend `RelDistribution` to support more distribution types, for example, hash random type. 

For example, there is a lookup join in Flink. is typically used to enrich a table with data that is queried from an external system. 
For the following query

{code:sql}
select p.x, p.y, b.z, b.pt_year, b.pt_mon, b.pt_day 
from default_catalog.default_database.probe as p
join partition_table_3 for system_time as of p.proc_time as b
on p.x=b.x and p.y=b.y
{code}


When use normal hash distribution.
The logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash[x, y]])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}


If enable data_skew solution in hint, the logical plan is as following,

{code:java}
   +- LookupJoin(joinType=[InnerJoin], lookup=[x=x, y=y], select=[x, y, x0, y0, z, pt_year, pt_mon, pt_day])
      +- Exchange(distribution=[hash_random(key=[x, y], bucket_num=8)])
         +- TableSourceScan(table=[[default_catalog, default_database, probe, source: [CollectionTableSource(x, y)]]], fields=[x, y])
{code}





--
This message was sent by Atlassian Jira
(v8.20.1#820001)