You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "Boaz Ben-Zvi (JIRA)" <ji...@apache.org> on 2018/11/27 22:57:00 UTC

[jira] [Commented] (DRILL-6836) Eliminate StreamingAggr for COUNT DISTINCT

    [ https://issues.apache.org/jira/browse/DRILL-6836?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16701142#comment-16701142 ] 

Boaz Ben-Zvi commented on DRILL-6836:
-------------------------------------

Regarding a parallel plan, where there are multiple Hash-Aggr (2nd phase) working in parallel, and a single Streaming-Aggr at the root fragment to return the final count – this would +work the same way+, as the actual plan (see below) places a Streaming-Aggr (01-01) on top of each 2nd phase Hash-Aggr (01-02), to perform the COUNT, and the root Streaming-Aggr (00-02) is actually performing a SUM. Here is an example plan (trimmed for readability):
{code:java}
00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {}, id = 1698
00-01      Project(EXPR$0=[$0]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {}, id = 1697
00-02        StreamAgg(group=[{}], EXPR$0=[$SUM0($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {}, id = 1696
00-03          UnionExchange : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {}, id = 1695
01-01            StreamAgg(group=[{}], EXPR$0=[COUNT($0)]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 1.0, cumulative cost = {}, id = 1694
01-02              HashAgg(group=[{0}]) : rowType = RecordType(ANY full_name): rowcount = 1.0, cumulative cost = {}, id = 1693
01-03                Project(full_name=[$0]) : rowType = RecordType(ANY full_name): rowcount = 2.5, cumulative cost = {}, id = 1692
01-04                  HashToRandomExchange(dist0=[[$0]]) : rowType = RecordType(ANY full_name, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.5, cumulative cost = {}, id = 1691
02-01                    UnorderedMuxExchange : rowType = RecordType(ANY full_name, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 2.5, cumulative cost = {}, id = 1690
03-01                      Project(full_name=[$0], E_X_P=[hash32AsDouble($0, 1301011)]) : rowType = RecordType(ANY, ANY E_X_P): rowcount = 2.5, cumulative cost = {}, id = 1689
03-02                        HashAgg(group=[{0}]) : rowType = RecordType(ANY full_name): rowcount = 2.5, cumulative cost = {}, id = 1688
03-03                          Scan(table=[[]], groupscan=[EasyGroupScan [selectionRoot=file:, numFiles=9, files=[]) : rowType = : rowcount =, cumulative cost = {}, id = 1687

{code}

> Eliminate StreamingAggr for COUNT DISTINCT
> ------------------------------------------
>
>                 Key: DRILL-6836
>                 URL: https://issues.apache.org/jira/browse/DRILL-6836
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators, Query Planning &amp; Optimization
>    Affects Versions: 1.14.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>            Priority: Minor
>             Fix For: 1.16.0
>
>
> The COUNT DISTINCT operation is often implemented with a Hash-Aggr operator for the DISTINCT, and a Streaming-Aggr above to perform the COUNT.  That Streaming-Aggr does the counting like any aggregation, counting each value, batch after batch.
>   While very efficient, that counting work is basically not needed, as the Hash-Aggr knows the number of distinct values (in the in-memory partitions).
>   Hence _a possible small performance improvement_ - eliminate the Streaming-Aggr operator, and notify the Hash-Aggr to return a COUNT (these are Planner changes). The Hash-Aggr operator would need to generate the single Float8 column output schema, and output that batch with a single value, just like the Streaming -Aggr did (likely without generating code).
>   In case of a spill, the Hash-Aggr still needs to read and process those partitions, to get the exact distinct number.
>    The expected improvement is the elimination of the batch by batch output from the Hash-Aggr, and the batch by batch, row by row processing of the Streaming-Aggr.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)