You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:02:22 UTC

[jira] [Updated] (SPARK-21964) Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics

     [ https://issues.apache.org/jira/browse/SPARK-21964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-21964:
---------------------------------
    Labels: bulk-closed  (was: )

> Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
> --------------------------------------------------------------------------------------------
>
>                 Key: SPARK-21964
>                 URL: https://issues.apache.org/jira/browse/SPARK-21964
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL
>    Affects Versions: 2.0.0, 2.0.2, 2.1.0, 2.1.1, 2.2.0
>            Reporter: Feng Zhu
>            Priority: Major
>              Labels: bulk-closed
>         Attachments: OOMRetry.png, Union.png, before.png
>
>
> In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup and grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation, we can read the table only once.
> However, for many scenarios (e.g., high dimensions cube), the Expand operator is too "heavy" with a large number of projections, resulting vast shuffle write.
> In our production environment, we have encountered various such cases, leading to low performance or even OOM issues for direct buffer memory. We demonstrate the issue with the following real-world query of a 6-dimensional cube.
>   
> {code:sql}
> SELECT CASE WHEN grouping(iFrom) = 1 THEN -1 ELSE iFrom END AS iFrom
>       ,CASE WHEN grouping(iSrcId) = 1 THEN -1 ELSE iSrcId END AS iSrcId
>       ,CASE WHEN grouping(sgametype) = 1 THEN '-1' ELSE sgametype END AS sgametype
>       ,CASE WHEN grouping(iOperId) = 1 THEN -1 ELSE iOperId END AS iOperId
>       ,CASE WHEN grouping(igameid) = 1 THEN -1 ELSE igameid END AS igameid
>       ,CASE WHEN grouping(iacttypeid) = 1 THEN -1 ELSE iacttypeid END AS iacttypeid
>       ,SUM(iclickcnt) AS iclickcnt
> FROM p_day_advert
> WHERE  statedate = 20170810
> GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
> {code}
> For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though the query reads only about 3GB data, it produces about 250GB data for shuffle write. In our environment, the first stage costs about 2 hours.
> !https://issues.apache.org/jira/secure/attachment/12886247/before.png!
> The second stage is easy to get an OOM error unless we enlarge the some configurations.
> !https://issues.apache.org/jira/secure/attachment/12886246/OOMRetry.png!
> Therefore, we tend to provide another choice which enables splitting the heavyweight aggregate into a number of lightweight aggregates for each group. Actually, it implements the grouping analytics as Union and executes the aggregates one by one. Though it reads the data many times, we can still achieve overall high performance. With such implementation, the query can be accomplished in about 20 mins, of which each aggregation takes 1~4 mins.
> !https://issues.apache.org/jira/secure/attachment/12886245/Union.png!



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org