You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Florentino Sainz (Jira)" <ji...@apache.org> on 2019/09/27 12:21:00 UTC

[jira] [Updated] (SPARK-29265) Window+collect_list causing single-task operation

     [ https://issues.apache.org/jira/browse/SPARK-29265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Florentino Sainz updated SPARK-29265:
-------------------------------------
    Description: 
Hi,

 

I had this problem in "real" environments and also made a self-contained test ( [^Test.scala] attached).

Having this Window definition:
{code:scala}
val myWindow = Window.partitionBy($"word").orderBy("word") 
val filt2 = filtrador.withColumn("avg_Time", collect_list($"number").over(myWindow))
{code}
  


 

In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy) 

 

Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) attached in Test.scala, sbt project (src and build.sbt) attached too in TestSpark.zip.

  was:
Hi,

 

I had this problem in "real" environments and also made a self-contained test ( [^Test.scala] attached).

Having this Window definition:
{code:scala}
val myWindow = Window.partitionBy($"word").orderBy("word") 
val filt2 = filtrador.withColumn("avg_Time", collect_list($"number").over(myWindow))
{code}
 

As a user, I would expect either:

1- Error/warning (because trying to sort on one of the columns of the window partitionBy)

2- A mostly-useless operation which just orders the rows (which have the same value on word) inside each Window but doesn't affect performance too much.

 

Currently what I see:

*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is performing a global orderBy on the whole DataFrame. Similar to dataframe.orderBy("word").*

*In my real environment, my program just didn't finish in time/crashed thus causing my program to be very slow or crash (because as it's a global orderBy, it will just go to one executor).*

 

In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy) 

 

Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) attached in Test.scala, sbt project (src and build.sbt) attached too in TestSpark.zip.


> Window+collect_list causing single-task operation
> -------------------------------------------------
>
>                 Key: SPARK-29265
>                 URL: https://issues.apache.org/jira/browse/SPARK-29265
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0
>         Environment: Any
>            Reporter: Florentino Sainz
>            Priority: Minor
>
> Hi,
>  
> I had this problem in "real" environments and also made a self-contained test ( [^Test.scala] attached).
> Having this Window definition:
> {code:scala}
> val myWindow = Window.partitionBy($"word").orderBy("word") 
> val filt2 = filtrador.withColumn("avg_Time", collect_list($"number").over(myWindow))
> {code}
>   
>  
> In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy) 
>  
> Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) attached in Test.scala, sbt project (src and build.sbt) attached too in TestSpark.zip.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org