You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Florentino Sainz (Jira)" <ji...@apache.org> on 2019/09/26 21:43:00 UTC
[jira] [Created] (SPARK-29265) Window orderBy causing full-DF
orderBy
Florentino Sainz created SPARK-29265:
----------------------------------------
Summary: Window orderBy causing full-DF orderBy
Key: SPARK-29265
URL: https://issues.apache.org/jira/browse/SPARK-29265
Project: Spark
Issue Type: Bug
Components: Spark Core
Affects Versions: 2.4.4, 2.4.3, 2.3.0
Environment: Any
Reporter: Florentino Sainz
Hi,
I had this problem in "real" environments and also made a self-contained test (attached).
Having this Window definition:
{code:java}
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow)){code}
As a user, I would expect either:
1- Error/warning (because trying to sort on one of the columns of the window partitionBy)
2- A mostly-useless operation which just orders the rows inside each Window but doesn't affect performance too much.
Currently what I see:
*When I use "myWindow" in any DataFrame, somehow that Window.orderBy is performing a global orderBy of the whole DataFrame. Similar to dataframe.orderBy("word").*
*In my real environment, my program just didn't finish in time/crashed thus causing my program to be very slow or crash (because as it's a global orderBy, it will just go to one executor).*
In the test I can see how all elements of my DF are in a single partition (side-effect of the global orderBy)
Full Code showing the error (see how the mapPartitions shows 99 rows in one partition) <-- You can pase it in Intellij/Any other and it should work:
{quote}
import java.io.ByteArrayOutputStream
import java.net.URL
import java.nio.charset.Charset
import org.apache.commons.io.IOUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.types.\{IntegerType, StringType, StructField, StructType}
import scala.collection.mutable
object Test {
case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.autoBroadcastJoinThreshold", -1)
.master("local[4]")
.appName("Word Count")
.getOrCreate()
import org.apache.spark.sql.functions._
import spark.implicits._
val sc = spark.sparkContext
val expectedSchema = List(
StructField("number", IntegerType, false),
StructField("word", StringType, false),
StructField("dummyColumn", StringType, false)
)
val expectedData = Seq(
Row(8, "bat", "test"),
Row(64, "mouse", "test"),
Row(-27, "horse", "test")
)
val filtrador = spark.createDataFrame(
spark.sparkContext.parallelize(expectedData),
StructType(expectedSchema)
).withColumn("dummy", explode(array((1 until 100).map(lit): _*)))
val myWindow = Window.partitionBy($"word").orderBy("word")
val filt2 = filtrador.withColumn("avg_Time", avg($"number").over(myWindow))
filt2.show
filt2.rdd.mapPartitions(iter => Iterator(iter.size), true).collect().foreach(println)
}
}
{quote}
{code:java}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org