You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Michael David Pedersen <mi...@googlemail.com> on 2016/10/25 16:01:43 UTC

Transforming Spark SQL AST with extraOptimizations

Hi,

I'm wanting to take a SQL string as a user input, then transform it before
execution. In particular, I want to modify the top-level projection (select
clause), injecting additional columns to be retrieved by the query.

I was hoping to achieve this by hooking into Catalyst using
sparkSession.experimental.extraOptimizations. I know that what I'm
attempting isn't strictly speaking an optimisation (the transformation
changes the semantics of the SQL statement), but the API still seems
suitable. However, my transformation seems to be ignored by the query
executor.

Here is a minimal example to illustrate the issue I'm having. First define
a row case class:

case class TestRow(a: Int, b: Int, c: Int)

Then define an optimisation rule which simply discards any projection:

object RemoveProjectOptimisationRule extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
        case x: Project => x.child
    }
}

Now create a dataset, register the optimisation, and run a SQL query:

// Create a dataset and register table.
val dataset = List(TestRow(1, 2, 3)).toDS()
val tableName: String = "testtable"
dataset.createOrReplaceTempView(tableName)

// Register "optimisation".
sparkSession.experimental.extraOptimizations =
    Seq(RemoveProjectOptimisationRule)

// Run query.
val projected = sqlContext.sql("SELECT a FROM " + tableName + " WHERE a =
1")

// Print query result and the queryExecution object.
println("Query result:")
projected.collect.foreach(println)
println(projected.queryExecution)

Here is the output:

Query result:
[1]

== Parsed Logical Plan ==
'Project ['a]
+- 'Filter ('a = 1)
   +- 'UnresolvedRelation `testtable`

== Analyzed Logical Plan ==
a: int
Project [a#3]
+- Filter (a#3 = 1)
   +- SubqueryAlias testtable
      +- LocalRelation [a#3, b#4, c#5]

== Optimized Logical Plan ==
Filter (a#3 = 1)
+- LocalRelation [a#3, b#4, c#5]

== Physical Plan ==
*Filter (a#3 = 1)
+- LocalTableScan [a#3, b#4, c#5]

We see that the result is identical to that of the original SQL statement,
without the transformation applied. Yet, when printing the logical and
physical plans, the projection has indeed been removed. I've also confirmed
(through debug log output) that the transformation is indeed being invoked.

Any suggestions as to what's going on here? Maybe the optimiser simply
ignores "optimisations" that change semantics?

If using the optimisations isn't the way to go, can anybody suggest an
alternative? All I really want to do is parse the input SQL statement,
transform it, and pass the transformed AST to Spark for execution. But as
far as I can see, the APIs for doing this are private to the Spark sql
package. It may be possible to use reflection, but I'd like to avoid that.

Any pointers would be much appreciated.

Cheers,
Michael

PS: I've previously posted this on StackOverflow, here:
http://stackoverflow.com/questions/40235566/transforming-spark-sql-ast-with-extraoptimizations
.