You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/10/11 00:06:00 UTC
[jira] [Commented] (SPARK-22223) ObjectHashAggregate introduces
unnecessary shuffle
[ https://issues.apache.org/jira/browse/SPARK-22223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16199597#comment-16199597 ]
Takeshi Yamamuro commented on SPARK-22223:
------------------------------------------
The hash-based aggregate implementation requires the partitioning that you select in group-by clauses. In the looser case you suggested, it seems we need shuffles. Do I misunderstand your suggestion?
> ObjectHashAggregate introduces unnecessary shuffle
> --------------------------------------------------
>
> Key: SPARK-22223
> URL: https://issues.apache.org/jira/browse/SPARK-22223
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.2.0
> Environment: Spark 2.2.0 and following.
> {{spark.sql.execution.useObjectHashAggregateExec = true}}
> Reporter: Michele Costantino Soccio
>
> Since Spark 2.2 the {{groupBy}} plus {{collect_list}} makes use of unnecessary shuffle when the partitions at previous step are based on looser criteria than the current {{groupBy}}.
> For example:
> {code:java}
> //sample data from https://github.com/databricks/Spark-The-Definitive-Guide/tree/master/data/retail-data
> //Read the data and repartitions by "Country"
> val retailDF = spark.sql("Select * from online_retail")
> .repartition(col("Country"))
> //Group the data and collect.
> val aggregatedDF = retailDF
> .withColumn("Good", expr("(StockCode, UnitPrice, Quantity, Description)"))
> .groupBy("Country", "CustomerID", "InvoiceNo", "InvoiceDate")
> .agg(collect_list("Good").as("Goods"))
> .withColumn("Invoice", expr("(InvoiceNo, InvoiceDate, Goods)"))
> .groupBy("Country", "CustomerID")
> .agg(collect_list("Invoice").as("Invoices"))
> .withColumn("Customer", expr("(CustomerID, Invoices)"))
> .groupBy("Country")
> .agg(collect_list("Customer").as("Customers"))
> {code}
> Without disabling the {{ObjectHashAggregate}} one gets the following physical plan:
> {noformat}
> == Physical Plan ==
> ObjectHashAggregate(keys=[Country#14], functions=[finalmerge_collect_list(merge buf#317) AS collect_list(Customer#299, 0, 0)#310])
> +- Exchange hashpartitioning(Country#14, 200)
> +- ObjectHashAggregate(keys=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#317])
> +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
> +- Exchange hashpartitioning(Country#14, CustomerID#13, 200)
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
> +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
> +- Exchange hashpartitioning(Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11, 200)
> +- ObjectHashAggregate(keys=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
> +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
> +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> With Spark 2.1.0 or when {{ObjectHashAggregate}} is disabled, one gets a more efficient:
> {noformat}
> == Physical Plan ==
> SortAggregate(key=[Country#14], functions=[finalmerge_collect_list(merge buf#3834) AS collect_list(Customer#299, 0, 0)#310])
> +- SortAggregate(key=[Country#14], functions=[partial_collect_list(Customer#299, 0, 0) AS buf#3834])
> +- *Project [Country#14, named_struct(CustomerID, CustomerID#13, Invoices, Invoices#294) AS Customer#299]
> +- SortAggregate(key=[Country#14, CustomerID#13], functions=[finalmerge_collect_list(merge buf#319) AS collect_list(Invoice#278, 0, 0)#293])
> +- SortAggregate(key=[Country#14, CustomerID#13], functions=[partial_collect_list(Invoice#278, 0, 0) AS buf#319])
> +- *Project [Country#14, CustomerID#13, named_struct(InvoiceNo, InvoiceNo#7, InvoiceDate, InvoiceDate#11, Goods, Goods#271) AS Invoice#278]
> +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[finalmerge_collect_list(merge buf#321) AS collect_list(Good#249, 0, 0)#270])
> +- SortAggregate(key=[Country#14, CustomerID#13, InvoiceNo#7, InvoiceDate#11], functions=[partial_collect_list(Good#249, 0, 0) AS buf#321])
> +- *Sort [Country#14 ASC NULLS FIRST, CustomerID#13 ASC NULLS FIRST, InvoiceNo#7 ASC NULLS FIRST, InvoiceDate#11 ASC NULLS FIRST], false, 0
> +- *Project [InvoiceNo#7, InvoiceDate#11, CustomerID#13, Country#14, named_struct(StockCode, StockCode#8, UnitPrice, UnitPrice#12, Quantity, Quantity#10, Description, Description#9) AS Good#249]
> +- Exchange hashpartitioning(Country#14, 200)
> +- *FileScan csv default.online_retail[InvoiceNo#7,StockCode#8,Description#9,Quantity#10,InvoiceDate#11,UnitPrice#12,CustomerID#13,Country#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[dbfs:/FileStore/tables/scgc0grb1506404260438], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<InvoiceNo:string,StockCode:string,Description:string,Quantity:string,InvoiceDate:string,Un...
> {noformat}
> In this example, a quick run on DataBricks Notebook showed that by manually disabling the {{ObjectHashAggregate}} one gets around 16s execution time versus the 25s needed when {{ObjectHashAggregate}} is enabled.
> The use of the {{ObjectHashAggregate}} in the {{groupBy}} was introduced with SPARK-17949.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org