You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Florentino Sainz (Jira)" <ji...@apache.org> on 2019/09/26 21:43:00 UTC

[jira] [Created] (SPARK-29265) Window orderBy causing full-DF orderBy

Florentino Sainz created SPARK-29265:
----------------------------------------

             Summary: Window orderBy causing full-DF orderBy 
                 Key: SPARK-29265
                 URL: https://issues.apache.org/jira/browse/SPARK-29265
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.4, 2.4.3, 2.3.0
         Environment: Any
            Reporter: Florentino Sainz


Hi,

 

I had this problem in "real" environments and also made a self-contained test (attached).

Having this Window definition:
{code:java}
val myWindow = Window.partitionBy($"word").orderBy("word") 
val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow)){code}
 

As a user, I would expect either:

1- Error/warning (because trying to sort on one of the columns of the window partitionBy)

2- A mostly-useless operation which just orders the rows inside each Window but doesn't affect performance too much.

 

Currently what I see:

*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is performing a global orderBy of the whole DataFrame. Similar to dataframe.orderBy("word").*

*In my real environment, my program just didn't finish in time/crashed thus causing my program to be very slow or crash (because as it's a global orderBy, it will just go to one executor).*

 

In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy) 

 

Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) <-- You can pase it in Intellij/Any other and it should work:

 
{quote}
import java.io.ByteArrayOutputStream
import java.net.URL
import java.nio.charset.Charset

import org.apache.commons.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.\{IntegerType, StringType, StructField, StructType}

import scala.collection.mutable
object Test {

case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)


 def main(args: Array[String]): Unit = {
 val spark = SparkSession.builder
 .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
 .config("spark.sql.autoBroadcastJoinThreshold", -1)
 .master("local[4]")
 .appName("Word Count")
 .getOrCreate()

import org.apache.spark.sql.functions._
 import spark.implicits._

val sc = spark.sparkContext
 val expectedSchema = List(
 StructField("number", IntegerType, false),
 StructField("word", StringType, false),
 StructField("dummyColumn", StringType, false)
 )
 val expectedData = Seq(
 Row(8, "bat", "test"),
 Row(64, "mouse", "test"),
 Row(-27, "horse", "test")
 )

val filtrador = spark.createDataFrame(
 spark.sparkContext.parallelize(expectedData),
 StructType(expectedSchema)
 ).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))


 val myWindow = Window.partitionBy($"word").orderBy("word")
 val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow))
 filt2.show

filt2.rdd.mapPartitions(iter => Iterator(iter.size), true).collect().foreach(println)

}
}
{quote}
 
{code:java}
 {code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org