You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "Aman Sinha (JIRA)" <ji...@apache.org> on 2015/04/21 22:18:58 UTC

[jira] [Commented] (DRILL-2840) Duplicate HashAgg operator seen in physical plan for aggregate & grouping query

    [ https://issues.apache.org/jira/browse/DRILL-2840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14505673#comment-14505673 ] 

Aman Sinha commented on DRILL-2840:
-----------------------------------

For case 1, the 2 hash consecutive hash aggs happen because we initially create a 2 phase aggregate which has an exchange between them and then  based on estimated row count of the input we decide to remove the exchange if the row count is smaller than slice_target.   Thus, functionally having the 2 hash aggs is fine, although one could generate the plan with one for better performance. In that sense, it would be an improvement, not a bug. 

For case 2, again the successive hashaggs are doing group-by on different columns, so that's ok but the query hang needs to be investigated.  Can you provide the drillbit.log file when this query was run ?  It is possible there was an error condition that did not get propagated. 


> Duplicate HashAgg operator seen in physical plan for aggregate & grouping query
> -------------------------------------------------------------------------------
>
>                 Key: DRILL-2840
>                 URL: https://issues.apache.org/jira/browse/DRILL-2840
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 0.9.0
>            Reporter: Khurram Faraaz
>            Assignee: Jinfeng Ni
>
> Test was executed on 4 node cluster on CentOS.
> {code}
> Case 1) We need to know why there is an additional (HashAgg(group=[{0, 1}])) operator right after the Scan is done, in the physical plan.
> 0: jdbc:drill:> select max( distinct key1 ) maximum, count( distinct key1 ) count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 ) minimum, avg( distinct key1 ) average, key2 from `twoKeyJsn.json` group by key2 order by key2;
> +------------+------------+------------+------------+------------+------------+
> |  maximum   | count_key1 |  sum_key1  |  minimum   |  average   |    key2    |
> +------------+------------+------------+------------+------------+------------+
> | 1.40095133379E9 | 156        | 1.1920469973999657E11 | 2.39793089027E7 | 7.641326906410036E8 | 0          |
> | 1.42435032101E9 | 180        | 1.2884789516328592E11 | 8659240.29442 | 7.158216397960329E8 | 1          |
> | 1.42931626355E9 | 1872281    | 1.3386731804571605E15 | 618.939623926 | 7.149958689198686E8 | a          |
> | 1.42931347924E9 | 1870676    | 1.3371603128280032E15 | 108.851943741 | 7.14800592314224E8 | b          |
> | 1.42931336919E9 | 1871847    | 1.337837189079748E15 | 3018.47312743 | 7.147150323075273E8 | c          |
> | 1.42931380603E9 | 1870697    | 1.3362335178170852E15 | 3890.92180463 | 7.142971404867198E8 | d          |
> | 1.42931281008E9 | 1871507    | 1.3367368067327902E15 | 1165.48741414 | 7.142569099302275E8 | e          |
> | 1.42931480081E9 | 1870450    | 1.3358301916601862E15 | 354.577534881 | 7.14175835579773E8 | f          |
> | 1.42931509068E9 | 1873604    | 1.3389171286500712E15 | 889.584888053 | 7.146211945801094E8 | g          |
> | 1.42931553374E9 | 1872726    | 1.3393592500619982E15 | 2704.34813594 | 7.151923186104097E8 | h          |
> | 1.42931450347E9 | 1872434    | 1.3381712881732795E15 | 122.281412463 | 7.146694025921766E8 | i          |
> | 1.42931539751E9 | 1872250    | 1.3380216282921535E15 | 946.21365677 | 7.146597026530397E8 | j          |
> | 1.42931334853E9 | 1873923    | 1.3390341356271005E15 | 1070.7862089 | 7.145619834043877E8 | k          |
> | 1.42931539809E9 | 1870929    | 1.3371605654647945E15 | 55.1144569856 | 7.147040670516062E8 | l          |
> | 1.42931543226E9 | 1874172    | 1.339322148620916E15 | 858.05505376 | 7.146207224421856E8 | m          |
> | 1.42931595791E9 | 1874462    | 1.3391024723756562E15 | 237.230716926 | 7.143929684227561E8 | n          |
> +------------+------------+------------+------------+------------+------------+
> 16 rows selected (103.566 seconds)
> 0: jdbc:drill:> explain plan for select max( distinct key1 ) maximum, count( distinct key1 ) count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 ) minimum, avg( distinct key1 ) average, key2 from `twoKeyJsn.json` group by key2 order by key2;
> +------------+------------+
> |    text    |    json    |
> +------------+------------+
> | 00-00    Screen
> 00-01      Project(maximum=[$0], count_key1=[$1], sum_key1=[$2], minimum=[$3], average=[$4], key2=[$5])
> 00-02        SelectionVectorRemover
> 00-03          Sort(sort0=[$5], dir0=[ASC])
> 00-04            Project(maximum=[$1], count_key1=[$2], sum_key1=[CASE(=($2, 0), null, $3)], minimum=[$4], average=[CAST(/(CastHigh(CASE(=($2, 0), null, $3)), $2)):ANY NOT NULL], key2=[$0])
> 00-05              HashAgg(group=[{0}], maximum=[MAX($1)], count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)])
> 00-06                HashAgg(group=[{0, 1}])
> 00-07                  HashAgg(group=[{0, 1}])
> 00-08                    Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/twoKeyJsn.json, numFiles=1, columns=[`key2`, `key1`], files=[maprfs:/tmp/twoKeyJsn.json]]])
> config options related to hashing and aggregation were set to,
> 0: jdbc:drill:> select * from sys.options where name like '%agg%';
> +------------+------------+------------+------------+------------+------------+------------+
> |    name    |    kind    |    type    |  num_val   | string_val |  bool_val  | float_val  |
> +------------+------------+------------+------------+------------+------------+------------+
> | planner.enable_multiphase_agg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.enable_streamagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.enable_hashagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.memory.hash_agg_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> +------------+------------+------------+------------+------------+------------+------------+
> 4 rows selected (0.203 seconds)
> 0: jdbc:drill:> select * from sys.options where name like '%hash%';
> +------------+------------+------------+------------+------------+------------+------------+
> |    name    |    kind    |    type    |  num_val   | string_val |  bool_val  | float_val  |
> +------------+------------+------------+------------+------------+------------+------------+
> | planner.enable_hash_single_key | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.join.hash_join_swap_margin_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 10.0       |
> | exec.max_hash_table_size | LONG       | SYSTEM     | 1073741824 | null       | null       | null       |
> | planner.enable_hashjoin_swap | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | exec.min_hash_table_size | LONG       | SYSTEM     | 65536      | null       | null       | null       |
> | planner.enable_hashagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.memory.hash_agg_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> | planner.memory.hash_join_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> | planner.enable_hashjoin | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> +------------+------------+------------+------------+------------+------------+------------+
> 9 rows selected (0.144 seconds)
> {code}
> Here is another (similar) aggregate and grouping query that hung forever.
> {code}
> case 2) This aggregate and grouping query hangs indefinitely forever...
>  input is from CSV file and it has two columns of data. It is running on 4 node cluster on CentOS. Data file has close to 26 million records in it.
> 0: jdbc:drill:> select count(*) from (select max( distinct cast(columns[0] as double) ) maximum, count( distinct cast(columns[0] as double) ) count_key1, sum( distinct cast(columns[0] as double)) sum_key1, min( distinct cast(columns[0] as double)) minimum, avg( distinct cast(columns[0] as double)) average, columns[1] from `tblfrmJsnToCSV/0_0_0.csv` where columns[0] <> 'key1' group by columns[1]);
> Physical plan for the query is
> 00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 22.877799999999997, cumulative cost = {2176138.623779999 rows, 7886477.503779999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74019
> 00-01      StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 22.877799999999997, cumulative cost = {2176136.335999999 rows, 7886475.215999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74018
> 00-02        Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 228.77799999999996, cumulative cost = {2175907.5579999993 rows, 7883729.879999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74017
> 00-03          HashAgg(group=[{0}], maximum=[MAX($1)], count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)]) : rowType = RecordType(ANY EXPR$5, DOUBLE maximum, BIGINT count_key1, DOUBLE $f3, DOUBLE minimum): rowcount = 228.77799999999996, cumulative cost = {2175678.7799999993 rows, 7883725.879999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74016
> 00-04            HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 2287.7799999999997, cumulative cost = {2173390.9999999995 rows, 7755610.199999999 cpu, 0.0 io, 2.811224064E8 network, 6643713.12 memory}, id = 74015
> 00-05              Project(EXPR$5=[$0], $f1=[$1]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = {2150513.1999999997 rows, 7389565.399999999 cpu, 0.0 io, 2.811224064E8 network, 6039739.2 memory}, id = 74014
> 00-06                HashToRandomExchange(dist0=[[$0]], dist1=[[$1]]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2127635.4 rows, 7389557.399999999 cpu, 0.0 io, 2.811224064E8 network, 6039739.2 memory}, id = 74013
> 01-01                  UnorderedMuxExchange : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2104757.6 rows, 7115023.8 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74012
> 02-01                    Project(EXPR$5=[$0], $f1=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[castINT(hash64($1, hash64($0)))]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2081879.8 rows, 7092146.0 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74011
> 02-02                      HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = {2059002.0 rows, 7092134.0 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74010
> 02-03                        Project(EXPR$5=[$0], $f1=[CAST($1):DOUBLE]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 228778.0, cumulative cost = {1830224.0 rows, 3431686.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74009
> 02-04                          SelectionVectorRemover : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = {1601446.0 rows, 3431678.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74008
> 02-05                            Filter(condition=[<>($1, 'key1')]) : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = {1372668.0 rows, 3202900.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74007
> 02-06                              Project(ITEM=[ITEM($0, 1)], ITEM1=[ITEM($0, 0)]) : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 457556.0, cumulative cost = {915112.0 rows, 457564.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74006
> 02-07                                Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/tblfrmJsnToCSV/0_0_0.csv, numFiles=1, columns=[`columns`[1], `columns`[0]], files=[maprfs:/tmp/tblfrmJsnToCSV/0_0_0.csv]]]) : rowType = RecordType(ANY columns): rowcount = 457556.0, cumulative cost = {457556.0 rows, 457556.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74005
> Sample data from the data file
> [root@centos-01 logs]# hadoop fs -cat /tmp/tblfrmJsnToCSV/0_0_0.csv | more
> key1,key2
> 1.2968152673E9,d
> 4.67365529012E7,c
> 9.39682065896E7,b
> 1.01580172933E9,d
> 4.98788888641E8,1
> 1.52391833107E8,1
> 7.31290386917E8,a
> 6.92726688161E8,d
> 1.12383522654E9,a
> 1.26807240856E8,1
> 9.54482542201E8,1
> 1.32100398388E9,0
> 1.17405537683E9,a
> 3.49879149097E7,0
> 6.50489380899E7,b
> 1.00841781109E9,a
> 1.19199684011E9,c
> 1.88765338328E8,b
> 8.24243579027E8,a
> 7.03797780195E8,b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)