You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/10/17 04:49:00 UTC
[jira] [Resolved] (SPARK-22276) Unnecessary repartitioning
[ https://issues.apache.org/jira/browse/SPARK-22276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-22276.
----------------------------------
Resolution: Duplicate
> Unnecessary repartitioning
> --------------------------
>
> Key: SPARK-22276
> URL: https://issues.apache.org/jira/browse/SPARK-22276
> Project: Spark
> Issue Type: Bug
> Components: Optimizer
> Affects Versions: 2.2.0
> Reporter: Fernando Pereira
>
> When a dataframe is sorted it is partitioned with a RangePartitioner.
> If later we aggregate by the exact same fields over which sort was applied there is a new (apparently useless) Exchange repartitioning by a HashPartitioner.
> In my use case the groupBy exchange is still very costly as the aggregate function won't reduce the data volume.
> Is there any reason why groupBy always shuffles data, or could this be improved?
> Is there currently a way to workaround for the moment, without going to mapPartitions?
> Example
> {code}
> nrn_vals.printSchema()
> (nrn_vals
> .sort("post_gid")
> .groupBy("post_gid")
> .agg(F.collect_list("pre_gid").alias("pre_gids"))
> ).explain()
> {code}
> Outputs the following
> {code}
> root
> |-- pre_gid: integer (nullable = true)
> |-- post_gid: integer (nullable = true)
> |-- floatvec: array (nullable = false)
> | |-- element: float (containsNull = true)
> == Physical Plan ==
> ObjectHashAggregate(keys=[post_gid#1386], functions=[collect_list(pre_gid#1385, 0, 0)])
> +- Exchange hashpartitioning(post_gid#1386, 1)
> +- ObjectHashAggregate(keys=[post_gid#1386], functions=[partial_collect_list(pre_gid#1385, 0, 0)])
> +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
> +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
> +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pre_gid:int,post_gid:int>
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org