You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2021/03/31 10:40:24 UTC

[GitHub] [incubator-doris] EmmyMiao87 opened a new issue #5589: Colocate plan

EmmyMiao87 opened a new issue #5589:
URL: https://github.com/apache/incubator-doris/issues/5589


   **Is your feature request related to a problem? Please describe.**
   
   Currently Doris supports colocate table building and colocate join. 
   But it did not make full use of the advantages of colocate.
   For example, when there is an Aggregation Node between Hash Join Node and Scan Node, colocate join cannot be performed.
   For another example, Aggregation Node, Sort Node, and Set Operation nodes can also be absorbed by child nodes when the data distribution matches, but they are not currently available.
   
   **Describe the solution you'd like**
   
   In the case of reasonable data distribution, the upper-level operators can be absorbed by the lower-level operators, 
   thereby reducing unnecessary network transmission and serialization and deserialization operations.
   
   **Describe alternatives you've considered**
   
   The main operators for colocate optimization include the following 4 plan node:
   Hash Join Node
   Set Operation Node
   Aggregation Node
   Sort Node (in window function)
   
   Step1: create table
   ```
   CREATE TABLE `test_colocate` (
     `k1` int(11) NULL COMMENT "",
     `k2` int(11) NULL COMMENT "",
     `k3` int(11) NULL COMMENT "",
     `k4` int(11) NULL COMMENT ""
   ) ENGINE=OLAP
   DUPLICATE KEY(`k1`, `k2`, `k3`)
   COMMENT "OLAP"
   DISTRIBUTED BY HASH(`k1`, `k2`) BUCKETS 10
   PROPERTIES (
   "replication_num" = "2",
   "in_memory" = "false",
   "storage_format" = "V2"
   );
   ```
   
   Step2:  colocate join
   
   ```
   explain select * from (select k1, k2 from test_colocate group by k1, k2) a , test_colocate b where a.k1=b.k1 and a.k2=b.k2;
   +-------------------------------------------------------------------------------------------------------------------------+
   | Explain String                                                                                                          |
   +-------------------------------------------------------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                                                                         |
   |  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2` | `b`.`k1` | `b`.`k2` | `b`.`k3` | `b`.`k4`                                 |
   |   PARTITION: UNPARTITIONED                                                                                              |
   |                                                                                                                         |
   |   RESULT SINK                                                                                                           |
   |                                                                                                                         |
   |   4:EXCHANGE                                                                                                            |
   |                                                                                                                         |
   | PLAN FRAGMENT 1                                                                                                         |
   |  OUTPUT EXPRS:                                                                                                          |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
   |                                                                                                                         |
   |   STREAM DATA SINK                                                                                                      |
   |     EXCHANGE ID: 04                                                                                                     |
   |     UNPARTITIONED                                                                                                       |
   |                                                                                                                         |
   |   3:HASH JOIN                                                                                                           |
   |   |  join op: INNER JOIN                                                                                                |
   |   |  hash predicates:                                                                                                   |
   |   |  colocate: true                                                                                                     |
   |   |  equal join conjunct: <slot 2> `k1` = `b`.`k1`                                                                      |
   |   |  equal join conjunct: <slot 3> `k2` = `b`.`k2`                                                                      |
   |   |                                                                                                                     |
   |   |----2:OlapScanNode                                                                                                   |
   |   |       TABLE: test_colocate                                                                                          |
   |   |                                                                                                                     |
   |   1:AGGREGATE (update finalize)                                                                                         |
   |   |  group by: `k1`, `k2`                                                                                               |
   |   |                                                                                                                     |
   |   0:OlapScanNode                                                                                                        |
   |      TABLE: test_colocate                                                                                               |
   +-------------------------------------------------------------------------------------------------------------------------+
   47 rows in set (0.020 sec)
   
   ```
   
   Step3: colocate aggregation node
   Condition: The input partition of aggregation node >= the data partition of child fragment
   ```
    explain select k1, k2  from test_colocate where k1=1  group by k1, k2;
   +-------------------------------------------------------------------------------------------------------------------------+
   | Explain String                                                                                                          |
   +-------------------------------------------------------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                                                                         |
   |  OUTPUT EXPRS:<slot 2> `k1` | <slot 3> `k2`                                                                             |
   |   PARTITION: UNPARTITIONED                                                                                              |
   |                                                                                                                         |
   |   RESULT SINK                                                                                                           |
   |                                                                                                                         |
   |   2:EXCHANGE                                                                                                            |
   |                                                                                                                         |
   | PLAN FRAGMENT 1                                                                                                         |
   |  OUTPUT EXPRS:                                                                                                          |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
   |                                                                                                                         |
   |   STREAM DATA SINK                                                                                                      |
   |     EXCHANGE ID: 02                                                                                                     |
   |     UNPARTITIONED                                                                                                       |
   |                                                                                                                         |
   |   1:AGGREGATE (update finalize)                                                                                         |
   |   |  group by: `k1`, `k2`                                                                                               |
   |   |                                                                                                                     |
   |   0:OlapScanNode                                                                                                        |
   |      TABLE: test_colocate                                                                                               |
   +-------------------------------------------------------------------------------------------------------------------------+
   30 rows in set (0.011 sec)
   ```
   
   Step4: sort node (in window function)
   Condition:
   The sort columns of sort node >= the data partition of child fragment
   ```
   The sort column is k1 and k2.
   
   explain select k1, sum(k2) over(partition by k1 order by k2) from test_colocate;
   +-------------------------------------------------------------------------------------------------------------------------+
   | Explain String                                                                                                          |
   +-------------------------------------------------------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                                                                         |
   |  OUTPUT EXPRS:<slot 4> <slot 0> | <slot 3>                                                                              |
   |   PARTITION: UNPARTITIONED                                                                                              |
   |                                                                                                                         |
   |   RESULT SINK                                                                                                           |
   |                                                                                                                         |
   |   3:EXCHANGE                                                                                                            |
   |                                                                                                                         |
   | PLAN FRAGMENT 1                                                                                                         |
   |  OUTPUT EXPRS:                                                                                                          |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`test_colocate`.`k1`, `default_cluster:test`.`test_colocate`.`k2` |
   |                                                                                                                         |
   |   STREAM DATA SINK                                                                                                      |
   |     EXCHANGE ID: 03                                                                                                     |
   |     UNPARTITIONED                                                                                                       |
   |                                                                                                                         |
   |   2:ANALYTIC                                                                                                            |
   |   |  functions: [, sum(<slot 5> ), ]                                                                                    |
   |   |  partition by: `k1`                                                                                                 |
   |   |  order by: <slot 5>  ASC                                                                                            |
   |   |  window: RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW                                                          |
   |   |                                                                                                                     |
   |   1:SORT                                                                                                                |
   |   |  order by: <slot 4> <slot 0> ASC, <slot 5>  ASC                                                                     |
   |   |  offset: 0                                                                                                          |
   |   |                                                                                                                     |
   |   0:OlapScanNode                                                                                                        |
   |      TABLE: test_colocate                                                                                               |
   +-------------------------------------------------------------------------------------------------------------------------+
   36 rows in set (0.028 sec)
   ```
   
   Colocate set operation node
   Condition1: The data partition of all child fragments >= input partition of set operation node
   Condition2: If there is no exchange node in child fragment, it will mean that the data hasn't been rehashed.
   Condition3: Scan node are all in the same colocate group
   
   step1: create table
   ```
   CREATE TABLE `t1` (
       `id` int(11) COMMENT "",
       `value` varchar(8) COMMENT ""
       ) ENGINE=OLAP
       DUPLICATE KEY(`id`)
       DISTRIBUTED BY HASH(`id`) BUCKETS 10
       PROPERTIES (
       "colocate_with" = "t1",
       "replication_num" = "2"
       );
   
       CREATE TABLE `t2` (
       `id` int(11) COMMENT "",
       `value` varchar(8) COMMENT ""
       ) ENGINE=OLAP
       DUPLICATE KEY(`id`)
       DISTRIBUTED BY HASH(`id`) BUCKETS 10
       PROPERTIES (
       "colocate_with" = "t1",
       "replication_num" = "2"
       );
   
       CREATE TABLE `t3` (
       `id` int(11) COMMENT "",
       `value` varchar(8) COMMENT ""
       ) ENGINE=OLAP
       DUPLICATE KEY(`id`)
       DISTRIBUTED BY HASH(`id`) BUCKETS 10
       PROPERTIES (
       "colocate_with" = "t1",
       "replication_num" = "2"
       );
   ```
   
   step2: colocate query
   
   The t1, t2, t3 are in the same colocate group and the intersect column `id` is same as the distributed column `id`.
   
   ```
   explain select id from t1 intersect select id from t2 intersect select id from t3;
   +----------------------------------------------------------------------------------+
   | Explain String                                                                   |
   +----------------------------------------------------------------------------------+
   | PLAN FRAGMENT 0                                                                  |
   |  OUTPUT EXPRS:<slot 3> `id` `id` `id`                                            |
   |   PARTITION: UNPARTITIONED                                                       |
   |                                                                                  |
   |   RESULT SINK                                                                    |
   |                                                                                  |
   |   4:EXCHANGE                                                                     |
   |                                                                                  |
   | PLAN FRAGMENT 1                                                                  |
   |  OUTPUT EXPRS:                                                                   |
   |   PARTITION: HASH_PARTITIONED: `default_cluster:test`.`t1`.`id`                  |
   |                                                                                  |
   |   STREAM DATA SINK                                                               |
   |     EXCHANGE ID: 04                                                              |
   |     UNPARTITIONED                                                                |
   |                                                                                  |
   |   0:INTERSECT                                                                    |
   |   |  colocate=true                                                               |
   |   |                                                                              |
   |   |----2:OlapScanNode                                                            |
   |   |       TABLE: t2                                                              |
   |   |                                                                              |
   |   |----3:OlapScanNode                                                            |
   |   |       TABLE: t3                                                              |
   |   |                                                                              |
   |   1:OlapScanNode                                                                 |
   |      TABLE: t1                                                                   |
   +----------------------------------------------------------------------------------+
   ```
   
   
   **Additional context**
   
   The colocate plan will also have some bad cases. 
   For example, when the data is skewed, because colocate eliminates the problem of rehash, the query will be more affected by the data skew.
   In this case, it is best to give priority to solving the problem of data skew and set a reasonable fragmentation column.
   Of course, you can also turn off colocate through session configuration.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] EmmyMiao87 commented on issue #5589: Colocate plan

Posted by GitBox <gi...@apache.org>.
EmmyMiao87 commented on issue #5589:
URL: https://github.com/apache/incubator-doris/issues/5589#issuecomment-810967221


   #5184 The feature can resolve this issue.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] morningman closed issue #5589: Colocate plan

Posted by GitBox <gi...@apache.org>.
morningman closed issue #5589:
URL: https://github.com/apache/incubator-doris/issues/5589


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org