You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@drill.apache.org by "Sean Hsuan-Yi Chu (JIRA)" <ji...@apache.org> on 2015/04/22 00:05:00 UTC

[jira] [Updated] (DRILL-2840) Duplicate HashAgg operator seen in physical plan for aggregate & grouping query

     [ https://issues.apache.org/jira/browse/DRILL-2840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Sean Hsuan-Yi Chu updated DRILL-2840:
-------------------------------------
    Assignee: Sean Hsuan-Yi Chu  (was: Jinfeng Ni)

> Duplicate HashAgg operator seen in physical plan for aggregate & grouping query
> -------------------------------------------------------------------------------
>
>                 Key: DRILL-2840
>                 URL: https://issues.apache.org/jira/browse/DRILL-2840
>             Project: Apache Drill
>          Issue Type: Bug
>          Components: Query Planning & Optimization
>    Affects Versions: 0.9.0
>            Reporter: Khurram Faraaz
>            Assignee: Sean Hsuan-Yi Chu
>
> Test was executed on 4 node cluster on CentOS.
> {code}
> Case 1) We need to know why there is an additional (HashAgg(group=[{0, 1}])) operator right after the Scan is done, in the physical plan.
> 0: jdbc:drill:> select max( distinct key1 ) maximum, count( distinct key1 ) count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 ) minimum, avg( distinct key1 ) average, key2 from `twoKeyJsn.json` group by key2 order by key2;
> +------------+------------+------------+------------+------------+------------+
> |  maximum   | count_key1 |  sum_key1  |  minimum   |  average   |    key2    |
> +------------+------------+------------+------------+------------+------------+
> | 1.40095133379E9 | 156        | 1.1920469973999657E11 | 2.39793089027E7 | 7.641326906410036E8 | 0          |
> | 1.42435032101E9 | 180        | 1.2884789516328592E11 | 8659240.29442 | 7.158216397960329E8 | 1          |
> | 1.42931626355E9 | 1872281    | 1.3386731804571605E15 | 618.939623926 | 7.149958689198686E8 | a          |
> | 1.42931347924E9 | 1870676    | 1.3371603128280032E15 | 108.851943741 | 7.14800592314224E8 | b          |
> | 1.42931336919E9 | 1871847    | 1.337837189079748E15 | 3018.47312743 | 7.147150323075273E8 | c          |
> | 1.42931380603E9 | 1870697    | 1.3362335178170852E15 | 3890.92180463 | 7.142971404867198E8 | d          |
> | 1.42931281008E9 | 1871507    | 1.3367368067327902E15 | 1165.48741414 | 7.142569099302275E8 | e          |
> | 1.42931480081E9 | 1870450    | 1.3358301916601862E15 | 354.577534881 | 7.14175835579773E8 | f          |
> | 1.42931509068E9 | 1873604    | 1.3389171286500712E15 | 889.584888053 | 7.146211945801094E8 | g          |
> | 1.42931553374E9 | 1872726    | 1.3393592500619982E15 | 2704.34813594 | 7.151923186104097E8 | h          |
> | 1.42931450347E9 | 1872434    | 1.3381712881732795E15 | 122.281412463 | 7.146694025921766E8 | i          |
> | 1.42931539751E9 | 1872250    | 1.3380216282921535E15 | 946.21365677 | 7.146597026530397E8 | j          |
> | 1.42931334853E9 | 1873923    | 1.3390341356271005E15 | 1070.7862089 | 7.145619834043877E8 | k          |
> | 1.42931539809E9 | 1870929    | 1.3371605654647945E15 | 55.1144569856 | 7.147040670516062E8 | l          |
> | 1.42931543226E9 | 1874172    | 1.339322148620916E15 | 858.05505376 | 7.146207224421856E8 | m          |
> | 1.42931595791E9 | 1874462    | 1.3391024723756562E15 | 237.230716926 | 7.143929684227561E8 | n          |
> +------------+------------+------------+------------+------------+------------+
> 16 rows selected (103.566 seconds)
> 0: jdbc:drill:> explain plan for select max( distinct key1 ) maximum, count( distinct key1 ) count_key1, sum( distinct key1 ) sum_key1, min( distinct key1 ) minimum, avg( distinct key1 ) average, key2 from `twoKeyJsn.json` group by key2 order by key2;
> +------------+------------+
> |    text    |    json    |
> +------------+------------+
> | 00-00    Screen
> 00-01      Project(maximum=[$0], count_key1=[$1], sum_key1=[$2], minimum=[$3], average=[$4], key2=[$5])
> 00-02        SelectionVectorRemover
> 00-03          Sort(sort0=[$5], dir0=[ASC])
> 00-04            Project(maximum=[$1], count_key1=[$2], sum_key1=[CASE(=($2, 0), null, $3)], minimum=[$4], average=[CAST(/(CastHigh(CASE(=($2, 0), null, $3)), $2)):ANY NOT NULL], key2=[$0])
> 00-05              HashAgg(group=[{0}], maximum=[MAX($1)], count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)])
> 00-06                HashAgg(group=[{0, 1}])
> 00-07                  HashAgg(group=[{0, 1}])
> 00-08                    Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/twoKeyJsn.json, numFiles=1, columns=[`key2`, `key1`], files=[maprfs:/tmp/twoKeyJsn.json]]])
> config options related to hashing and aggregation were set to,
> 0: jdbc:drill:> select * from sys.options where name like '%agg%';
> +------------+------------+------------+------------+------------+------------+------------+
> |    name    |    kind    |    type    |  num_val   | string_val |  bool_val  | float_val  |
> +------------+------------+------------+------------+------------+------------+------------+
> | planner.enable_multiphase_agg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.enable_streamagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.enable_hashagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.memory.hash_agg_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> +------------+------------+------------+------------+------------+------------+------------+
> 4 rows selected (0.203 seconds)
> 0: jdbc:drill:> select * from sys.options where name like '%hash%';
> +------------+------------+------------+------------+------------+------------+------------+
> |    name    |    kind    |    type    |  num_val   | string_val |  bool_val  | float_val  |
> +------------+------------+------------+------------+------------+------------+------------+
> | planner.enable_hash_single_key | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.join.hash_join_swap_margin_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 10.0       |
> | exec.max_hash_table_size | LONG       | SYSTEM     | 1073741824 | null       | null       | null       |
> | planner.enable_hashjoin_swap | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | exec.min_hash_table_size | LONG       | SYSTEM     | 65536      | null       | null       | null       |
> | planner.enable_hashagg | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> | planner.memory.hash_agg_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> | planner.memory.hash_join_table_factor | DOUBLE     | SYSTEM     | null       | null       | null       | 1.1        |
> | planner.enable_hashjoin | BOOLEAN    | SYSTEM     | null       | null       | true       | null       |
> +------------+------------+------------+------------+------------+------------+------------+
> 9 rows selected (0.144 seconds)
> {code}
> Here is another (similar) aggregate and grouping query that hung forever.
> {code}
> case 2) This aggregate and grouping query hangs indefinitely forever...
>  input is from CSV file and it has two columns of data. It is running on 4 node cluster on CentOS. Data file has close to 26 million records in it.
> 0: jdbc:drill:> select count(*) from (select max( distinct cast(columns[0] as double) ) maximum, count( distinct cast(columns[0] as double) ) count_key1, sum( distinct cast(columns[0] as double)) sum_key1, min( distinct cast(columns[0] as double)) minimum, avg( distinct cast(columns[0] as double)) average, columns[1] from `tblfrmJsnToCSV/0_0_0.csv` where columns[0] <> 'key1' group by columns[1]);
> Physical plan for the query is
> 00-00    Screen : rowType = RecordType(BIGINT EXPR$0): rowcount = 22.877799999999997, cumulative cost = {2176138.623779999 rows, 7886477.503779999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74019
> 00-01      StreamAgg(group=[{}], EXPR$0=[COUNT()]) : rowType = RecordType(BIGINT EXPR$0): rowcount = 22.877799999999997, cumulative cost = {2176136.335999999 rows, 7886475.215999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74018
> 00-02        Project($f0=[0]) : rowType = RecordType(INTEGER $f0): rowcount = 228.77799999999996, cumulative cost = {2175907.5579999993 rows, 7883729.879999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74017
> 00-03          HashAgg(group=[{0}], maximum=[MAX($1)], count_key1=[COUNT($1)], agg#2=[$SUM0($1)], minimum=[MIN($1)]) : rowType = RecordType(ANY EXPR$5, DOUBLE maximum, BIGINT count_key1, DOUBLE $f3, DOUBLE minimum): rowcount = 228.77799999999996, cumulative cost = {2175678.7799999993 rows, 7883725.879999999 cpu, 0.0 io, 2.811224064E8 network, 6683978.048 memory}, id = 74016
> 00-04            HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 2287.7799999999997, cumulative cost = {2173390.9999999995 rows, 7755610.199999999 cpu, 0.0 io, 2.811224064E8 network, 6643713.12 memory}, id = 74015
> 00-05              Project(EXPR$5=[$0], $f1=[$1]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = {2150513.1999999997 rows, 7389565.399999999 cpu, 0.0 io, 2.811224064E8 network, 6039739.2 memory}, id = 74014
> 00-06                HashToRandomExchange(dist0=[[$0]], dist1=[[$1]]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2127635.4 rows, 7389557.399999999 cpu, 0.0 io, 2.811224064E8 network, 6039739.2 memory}, id = 74013
> 01-01                  UnorderedMuxExchange : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2104757.6 rows, 7115023.8 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74012
> 02-01                    Project(EXPR$5=[$0], $f1=[$1], E_X_P_R_H_A_S_H_F_I_E_L_D=[castINT(hash64($1, hash64($0)))]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1, ANY E_X_P_R_H_A_S_H_F_I_E_L_D): rowcount = 22877.8, cumulative cost = {2081879.8 rows, 7092146.0 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74011
> 02-02                      HashAgg(group=[{0, 1}]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 22877.8, cumulative cost = {2059002.0 rows, 7092134.0 cpu, 0.0 io, 0.0 network, 6039739.2 memory}, id = 74010
> 02-03                        Project(EXPR$5=[$0], $f1=[CAST($1):DOUBLE]) : rowType = RecordType(ANY EXPR$5, DOUBLE $f1): rowcount = 228778.0, cumulative cost = {1830224.0 rows, 3431686.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74009
> 02-04                          SelectionVectorRemover : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = {1601446.0 rows, 3431678.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74008
> 02-05                            Filter(condition=[<>($1, 'key1')]) : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 228778.0, cumulative cost = {1372668.0 rows, 3202900.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74007
> 02-06                              Project(ITEM=[ITEM($0, 1)], ITEM1=[ITEM($0, 0)]) : rowType = RecordType(ANY ITEM, ANY ITEM1): rowcount = 457556.0, cumulative cost = {915112.0 rows, 457564.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74006
> 02-07                                Scan(groupscan=[EasyGroupScan [selectionRoot=/tmp/tblfrmJsnToCSV/0_0_0.csv, numFiles=1, columns=[`columns`[1], `columns`[0]], files=[maprfs:/tmp/tblfrmJsnToCSV/0_0_0.csv]]]) : rowType = RecordType(ANY columns): rowcount = 457556.0, cumulative cost = {457556.0 rows, 457556.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 74005
> Sample data from the data file
> [root@centos-01 logs]# hadoop fs -cat /tmp/tblfrmJsnToCSV/0_0_0.csv | more
> key1,key2
> 1.2968152673E9,d
> 4.67365529012E7,c
> 9.39682065896E7,b
> 1.01580172933E9,d
> 4.98788888641E8,1
> 1.52391833107E8,1
> 7.31290386917E8,a
> 6.92726688161E8,d
> 1.12383522654E9,a
> 1.26807240856E8,1
> 9.54482542201E8,1
> 1.32100398388E9,0
> 1.17405537683E9,a
> 3.49879149097E7,0
> 6.50489380899E7,b
> 1.00841781109E9,a
> 1.19199684011E9,c
> 1.88765338328E8,b
> 8.24243579027E8,a
> 7.03797780195E8,b
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)