You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Feng Zhu (JIRA)" <ji...@apache.org> on 2017/09/09 12:33:00 UTC

[jira] [Created] (SPARK-21964) Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics

Feng Zhu created SPARK-21964:
--------------------------------

             Summary: Enable splitting the Aggregate (on Expand) into a number of Aggregates for grouing analytics
                 Key: SPARK-21964
                 URL: https://issues.apache.org/jira/browse/SPARK-21964
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.2.0, 2.1.1, 2.1.0, 2.0.2, 2.0.0
            Reporter: Feng Zhu


In current versions, Spark SQL implements grouping analytics clauses (i.e., cube, rollup and grouping sets) as a single Aggregate operator on a single Expand operator. With this implementation, we can read the table only once.

However, for many scenarios (e.g., high dimensions cube), the Expand operator is too "heavy" with a large number of projections, resulting vast shuffle write.

In our production environment, we have encountered various such cases, leading to low performance or even OOM issues for direct buffer memory.

We demonstrate the issue with the following real-world query of a 6-dimensional cube.
  
{code:sql}
SELECT CASE
           WHEN grouping(iFrom) = 1 THEN -1
           ELSE iFrom
       END AS iFrom
      ,CASE
           WHEN grouping(iSrcId) = 1 THEN -1
           ELSE iSrcId
       END AS iSrcId
      ,CASE
           WHEN grouping(sgametype) = 1 THEN '-1'
           ELSE sgametype
       END AS sgametype
      ,CASE
           WHEN grouping(iOperId) = 1 THEN -1
           ELSE iOperId
       END AS iOperId
      ,CASE
           WHEN grouping(igameid) = 1 THEN -1
           ELSE igameid
       END AS igameid
      ,CASE
           WHEN grouping(iacttypeid) = 1 THEN -1
           ELSE iacttypeid
       END AS iacttypeid
      ,SUM(iclickcnt) AS iclickcnt
FROM p_day_advert
WHERE  statedate = 20170810
GROUP BY iFrom, iSrcId, sgametype, iOperId, igameid, iacttypeid WITH CUBE
{code}

For such query, the Expand operator will generates 64 (i.e., 64=2^6) projections. Though the query reads only about 3GB data, it produces about 250GB data for shuffle write. In our environment, the first stage costs about 2 hours, and the second stage is easy to get an OOM error unless we enlarge the some configurations.

Therefore, we tend to provide another choice which enables splitting the heavyweight aggregate into a number of lightweight aggregates for each group. Actually, it implements the grouping analytics as Union and executes the aggregates one by one. Though it reads the data many times, we can still achieve overall high performance.

With such implementation, the query can be accomplished in about 20 mins, of which each aggregation takes 1~4 mins.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org