You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/10/11 00:06:00 UTC

[jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces unnecessary shuffle

    [ https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199597#comment-16199597 ] 

Takeshi Yamamuro commented on SPARK-22223:
------------------------------------------

The hash-based aggregate implementation requires the partitioning that you select in group-by clauses. In the looser case you suggested, it seems we need shuffles. Do I misunderstand your suggestion?

> ObjectHashAggregate introduces unnecessary shuffle
> --------------------------------------------------
>
>                 Key: SPARK-22223
>                 URL: https://issues.apache.org/jira/browse/SPARK-22223
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0
>         Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
>            Reporter: Michele Costantino Soccio
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
>     .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
>   .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
>   .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
>   .agg(collect_list("Good").as("Goods"))
>   .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
>   .groupBy("Country", "CustomerID")
>   .agg(collect_list("Invoice").as("Invoices"))
>   .withColumn("Customer", expr("(CustomerID, Invoices)"))
>   .groupBy("Country")
>   .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
>    +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
>       +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
>          +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
>             +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
>                +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
>                   +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
>                      +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
>                         +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
>                            +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>                               +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
>                                  +- Exchange hashpartitioning(Country#14, 200)
>                                     +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
>    +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
>       +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
>          +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
>             +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
>                +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
>                   +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
>                      +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
>                         +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
>                            +- Exchange hashpartitioning(Country#14, 200)
>                               +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> In this example, a quick run on DataBricks Notebook showed that by manually disabling the {{ObjectHashAggregate}} one gets around 16s execution time versus the 25s needed when {{ObjectHashAggregate}} is enabled.
> The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with SPARK-17949.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org