You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2017/10/17 04:49:00 UTC

[jira] [Resolved] (SPARK-22276) Unnecessary repartitioning

     [ https://issues.apache.org/jira/browse/SPARK-22276?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22276.
----------------------------------
    Resolution: Duplicate

> Unnecessary repartitioning
> --------------------------
>
>                 Key: SPARK-22276
>                 URL: https://issues.apache.org/jira/browse/SPARK-22276
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer
>    Affects Versions: 2.2.0
>            Reporter: Fernando Pereira
>
> When a dataframe is sorted it is partitioned with a RangePartitioner.
> If later we aggregate by the exact same fields over which sort was applied there is a new (apparently useless) Exchange repartitioning by a HashPartitioner.
> In my use case the groupBy exchange is still very costly as the aggregate function won't reduce the data volume.
> Is there any reason why groupBy always shuffles data, or could this be improved? 
> Is there currently a way to workaround for the moment, without going to mapPartitions?
> Example
> {code}
> nrn_vals.printSchema()
> (nrn_vals
>  .sort("post_gid")
>  .groupBy("post_gid")
>  .agg(F.collect_list("pre_gid").alias("pre_gids"))
>  ).explain()
> {code}
> Outputs the following
> {code}
> root
>  |-- pre_gid: integer (nullable = true)
>  |-- post_gid: integer (nullable = true)
>  |-- floatvec: array (nullable = false)
>  |    |-- element: float (containsNull = true)
> == Physical Plan ==
> ObjectHashAggregate(keys=[post_gid#1386], functions=[collect_list(pre_gid#1385, 0, 0)])
> +- Exchange hashpartitioning(post_gid#1386, 1)
>    +- ObjectHashAggregate(keys=[post_gid#1386], functions=[partial_collect_list(pre_gid#1385, 0, 0)])
>       +- *Sort [post_gid#1386 ASC NULLS FIRST], true, 0
>          +- Exchange rangepartitioning(post_gid#1386 ASC NULLS FIRST, 1)
>             +- *FileScan parquet [pre_gid#1385,post_gid#1386] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/media/psf/Home/dev/Functionalizer/pyspark/spykfunc_output/extended_touche..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<pre_gid:int,post_gid:int>
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org