You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aman Omer (Jira)" <ji...@apache.org> on 2019/12/03 05:19:00 UTC
[jira] [Comment Edited] (SPARK-30101) Dataset distinct does not
respect spark.default.parallelism
[ https://issues.apache.org/jira/browse/SPARK-30101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986619#comment-16986619 ]
Aman Omer edited comment on SPARK-30101 at 12/3/19 5:18 AM:
------------------------------------------------------------
{code:java}
scala> ds.distinct.explain
== Physical Plan ==
*(2) HashAggregate(keys=[value#13], functions=[])
+- Exchange hashpartitioning(value#13, 200), true, [id=#107]
+- *(1) HashAggregate(keys=[value#13], functions=[])
+- *(1) LocalTableScan [value#13]
{code}
Here, this `hashpartitioning` node is passing numPartitions as 200, which does not depends on `spark.default.parallelism`.
I am not sure whether this is a correct behavior for distinct.
[~sowen] [~wenchen] [~srowen@scient.com]
was (Author: aman_omer):
{code:java}
scala> ds.distinct.explain
== Physical Plan ==
*(2) HashAggregate(keys=[value#13], functions=[])
+- Exchange hashpartitioning(value#13, 200), true, [id=#107]
+- *(1) HashAggregate(keys=[value#13], functions=[])
+- *(1) LocalTableScan [value#13]
{code}
Here, this `hashpartitioning` node is passing numPartitions as 200, which does not depends on `spark.default.parallelism`.
I am not sure whether this is a correct behavior for distinct.
[~sowen] [~wenchen]
> Dataset distinct does not respect spark.default.parallelism
> -----------------------------------------------------------
>
> Key: SPARK-30101
> URL: https://issues.apache.org/jira/browse/SPARK-30101
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.0, 2.4.4
> Reporter: sam
> Priority: Major
>
> I'm creating a `SparkSession` like this:
> ```
> SparkSession
> .builder().appName("foo").master("local")
> .config("spark.default.parallelism", 2).getOrCreate()
> ```
> when I run
> ```
> ((1 to 10) ++ (1 to 10)).toDS().distinct().count()
> ```
> I get 200 partitions
> ```
> 19/12/02 10:29:34 INFO TaskSchedulerImpl: Adding task set 1.0 with 200 tasks
> ...
> 19/12/02 10:29:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 46 ms on localhost (executor driver) (1/200)
> ```
> It is the `distinct` that is broken since `ds.rdd.getNumPartitions` gives `2`, while `ds.distinct().rdd.getNumPartitions` gives `200`. `ds.rdd.groupBy(identity).map(_._2.head)` and `ds.rdd.distinct()` work correctly.
> Finally I notice that the good old `RDD` interface has a `distinct` that accepts `numPartitions` partitions, while `Dataset` does not.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org