You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Takeshi Yamamuro (JIRA)" <ji...@apache.org> on 2017/05/26 08:15:04 UTC
[jira] [Created] (SPARK-20895) Support fast execution based on an
optimized plan and parameter placeholders
Takeshi Yamamuro created SPARK-20895:
----------------------------------------
Summary: Support fast execution based on an optimized plan and parameter placeholders
Key: SPARK-20895
URL: https://issues.apache.org/jira/browse/SPARK-20895
Project: Spark
Issue Type: Improvement
Components: SQL
Affects Versions: 2.1.1
Reporter: Takeshi Yamamuro
Priority: Minor
In database scenarios, users sometimes use parameterized queries for repeated execution (e.g., by using prepared statements).
So, I think this functionality is also useful for Spark users.
What I suggest here seems to be like:
My prototype here: https://github.com/apache/spark/compare/master...maropu:PreparedStmt2
{code}
scala> Seq((1, 2), (2, 3)).toDF("col1", "col2").createOrReplaceTempView("t")
// Define a query with a parameter placeholder named `val`
scala> val df = sql("SELECT * FROM t WHERE col1 = $val")
scala> df.explain
== Physical Plan ==
*Project [_1#13 AS col1#16, _2#14 AS col2#17]
+- *Filter (_1#13 = cast(parameterholder(val) as int))
+- LocalTableScan [_1#13, _2#14]
// Apply optimizer rules and get an optimized logical plan with the parameter placeholder
scala> val preparedDf = df.prepared
// Bind an actual value and do execution
scala> preparedDf.bindParam("val", 1).show()
+----+----+
|col1|col2|
+----+----+
| 1| 2|
+----+----+
{code}
To implement this, my prototype adds a new expression leaf node named `ParameterHolder`.
In a binding phase, this node is replaced with `Literal` including an actual value by using `bindParam`.
Currently, Spark sometimes consumes much time to rewrite logical plans in `Optimizer` (e.g. constant propagation desribed in SPARK-19846).
So, I feel this approach is also helpful in that case:
{code}
def timer[R](f: => {}): Unit = {
val count = 9
val iters = (0 until count).map { i =>
val t0 = System.nanoTime()
f
val t1 = System.nanoTime()
val elapsed = t1 - t0 + 0.0
println(s"#$i: ${elapsed / 1000000000.0}")
elapsed
}
println("Avg. Elapsed Time: " + ((iters.sum / count) / 1000000000.0) + "s")
}
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
val numCols = 50
val df = spark.range(100).selectExpr((0 until numCols).map(i => s"id AS _c$i"): _*)
// Add conditions to take much time in Optimizer
val filter = (0 until 128).foldLeft(lit(false))((e, i) => e.or(df.col(df.columns(i % numCols)) === (rand() * 10).cast("int")))
val df2 = df.filter(filter).sort(df.columns(0))
// Regular path
timer {
df2.filter(df2.col(df2.columns(0)) === lit(3)).collect
df2.filter(df2.col(df2.columns(0)) === lit(4)).collect
df2.filter(df2.col(df2.columns(0)) === lit(5)).collect
df2.filter(df2.col(df2.columns(0)) === lit(6)).collect
df2.filter(df2.col(df2.columns(0)) === lit(7)).collect
df2.filter(df2.col(df2.columns(0)) === lit(8)).collect
}
#0: 24.178487906
#1: 22.619839888
#2: 22.318617035
#3: 22.131305502
#4: 22.532095611
#5: 22.245152778
#6: 22.314114847
#7: 22.284385952
#8: 22.053593855
Avg. Elapsed Time: 22.519732597111112s
// Prepared path
val df3b = df2.filter(df2.col(df2.columns(0)) === param("val")).prepared
timer {
df3b.bindParam("val", 3).collect
df3b.bindParam("val", 4).collect
df3b.bindParam("val", 5).collect
df3b.bindParam("val", 6).collect
df3b.bindParam("val", 7).collect
df3b.bindParam("val", 8).collect
}
#0: 0.744693912
#1: 0.743187129
#2: 0.745100003
#3: 0.721668718
#4: 0.757573342
#5: 0.763240883
#6: 0.731287275
#7: 0.728740601
#8: 0.674275592
Avg. Elapsed Time: 0.7344186061111112s
{code}
I'm not sure this approach is acceptable, so welcome any suggestion and advice.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org