You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Eyal Zituny (JIRA)" <ji...@apache.org> on 2017/07/17 14:54:00 UTC
[jira] [Created] (SPARK-21443) Very long planning duration for
queries with lots of operations
Eyal Zituny created SPARK-21443:
-----------------------------------
Summary: Very long planning duration for queries with lots of operations
Key: SPARK-21443
URL: https://issues.apache.org/jira/browse/SPARK-21443
Project: Spark
Issue Type: Bug
Components: SQL, Structured Streaming
Affects Versions: 2.2.0
Reporter: Eyal Zituny
Creating a streaming query with large amount of operations and fields (100+) results in a very long query planning phase. in the example bellow, the plan phase has taken 35 seconds while the actual batch execution took only 1.3 second.
after some investigation, i have found out that the root causes of this are 2 optimizer rules which seems to take most of the planning time: InferFiltersFromConstraints and PruneFilters
I would suggest the following:
# fix the inefficient optimizer rules
# add warn level logging if a rule has taken more then xx ms
# allow custom removing of optimizer rules (opposite to spark.experimental.extraOptimizations)
# reuse query plans (optional) where possible
reproducing this issue can be done with the bellow script which simulates the scenario:
{code:java}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.streaming.StreamingQueryListener.{QueryProgressEvent, QueryStartedEvent, QueryTerminatedEvent}
import org.apache.spark.sql.streaming.{ProcessingTime, StreamingQueryListener}
case class Product(pid: Long, name: String, price: Long, ts: Long = System.currentTimeMillis())
case class Events (eventId: Long, eventName: String, productId: Long) {
def this(id: Long) = this(id, s"event$id", id%100)
}
object SparkTestFlow {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("TestFlow")
.master("local[8]")
.getOrCreate()
spark.sqlContext.streams.addListener(new StreamingQueryListener {
override def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
override def onQueryProgress(event: QueryProgressEvent): Unit = {
if (event.progress.numInputRows>0) {
println(event.progress.toString())
}
}
override def onQueryStarted(event: QueryStartedEvent): Unit = {}
})
import spark.implicits._
implicit val sclContext = spark.sqlContext
import org.apache.spark.sql.functions.expr
val seq = (1L to 100L).map(i => Product(i, s"name$i", 10L*i))
val lookupTable = spark.createDataFrame(seq)
val inputData = MemoryStream[Events]
inputData.addData((1L to 100L).map(i => new Events(i)))
val events = inputData.toDF()
.withColumn("w1", expr("0"))
.withColumn("x1", expr("0"))
.withColumn("y1", expr("0"))
.withColumn("z1", expr("0"))
val numberOfSelects = 40 // set to 100+ and the planning takes forever
val dfWithSelectsExpr = (2 to numberOfSelects).foldLeft(events)((df,i) =>{
val arr = df.columns.++(Array(s"w${i-1} + rand() as w$i", s"x${i-1} + rand() as x$i", s"y${i-1} + 2 as y$i", s"z${i-1} +1 as z$i"))
df.selectExpr(arr:_*)
})
val withJoinAndFilter = dfWithSelectsExpr
.join(lookupTable, expr("productId = pid"))
.filter("productId < 50")
val query = withJoinAndFilter.writeStream
.outputMode("append")
.format("console")
.trigger(ProcessingTime(2000))
.start()
query.processAllAvailable()
spark.stop()
}
}
{code}
the query progress output will show:
{code:java}
"durationMs" : {
"addBatch" : 1310,
"getBatch" : 6,
"getOffset" : 0,
"*queryPlanning*" : 36924,
"triggerExecution" : 38297,
"walCommit" : 33
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org