You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/03/17 13:25:13 UTC

spark git commit: [SPARK-12719][SQL] SQL generation support for Generate

Repository: spark
Updated Branches:
  refs/heads/master 8ef3399af -> 1974d1d34


[SPARK-12719][SQL] SQL generation support for Generate

## What changes were proposed in this pull request?

This PR adds SQL generation support for `Generate` operator. It always converts `Generate` operator into `LATERAL VIEW` format as there are many limitations to put UDTF in project list.

This PR is based on https://github.com/apache/spark/pull/11658, please see the last commit to review the real changes.

Thanks dilipbiswal for his initial work! Takes over https://github.com/apache/spark/pull/11596

## How was this patch tested?

new tests in `LogicalPlanToSQLSuite`

Author: Wenchen Fan <we...@databricks.com>

Closes #11696 from cloud-fan/generate.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1974d1d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1974d1d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1974d1d3

Branch: refs/heads/master
Commit: 1974d1d34d42c91730eaf45f7958cfab4827a14c
Parents: 8ef3399
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Mar 17 20:25:05 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Mar 17 20:25:05 2016 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/SQLBuilder.scala  |  65 +++++++++--
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  | 112 +++++++++++++++++++
 2 files changed, 170 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1974d1d3/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index cd417ce..05dfad2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -126,6 +126,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     case w: Window =>
       windowToSQL(w)
 
+    case g: Generate =>
+      generateToSQL(g)
+
     case Limit(limitExpr, child) =>
       s"${toSQL(child)} LIMIT ${limitExpr.sql}"
 
@@ -250,6 +253,42 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     )
   }
 
+  private def generateToSQL(g: Generate): String = {
+    val columnAliases = g.generatorOutput.map(_.sql).mkString(",")
+
+    val childSQL = if (g.child == OneRowRelation) {
+      // This only happens when we put UDTF in project list and there is no FROM clause. Because we
+      // always generate LATERAL VIEW for `Generate`, here we use a trick to put a dummy sub-query
+      // after FROM clause, so that we can generate a valid LATERAL VIEW SQL string.
+      // For example, if the original SQL is: "SELECT EXPLODE(ARRAY(1, 2))", we will convert in to
+      // LATERAL VIEW format, and generate:
+      // SELECT col FROM (SELECT 1) sub-q0 LATERAL VIEW EXPLODE(ARRAY(1, 2)) sub_q1 AS col
+      s"(SELECT 1) ${SQLBuilder.newSubqueryName}"
+    } else {
+      toSQL(g.child)
+    }
+
+    // The final SQL string for Generate contains 7 parts:
+    //   1. the SQL of child, can be a table or sub-query
+    //   2. the LATERAL VIEW keyword
+    //   3. an optional OUTER keyword
+    //   4. the SQL of generator, e.g. EXPLODE(array_col)
+    //   5. the table alias for output columns of generator.
+    //   6. the AS keyword
+    //   7. the column alias, can be more than one, e.g. AS key, value
+    // An concrete example: "tbl LATERAL VIEW EXPLODE(map_col) sub_q AS key, value", and the builder
+    // will put it in FROM clause later.
+    build(
+      childSQL,
+      "LATERAL VIEW",
+      if (g.outer) "OUTER" else "",
+      g.generator.sql,
+      SQLBuilder.newSubqueryName,
+      "AS",
+      columnAliases
+    )
+  }
+
   private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean =
     output1.size == output2.size &&
       output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
@@ -423,6 +462,17 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         case j: Join => j.copy(
           left = addSubqueryIfNeeded(j.left),
           right = addSubqueryIfNeeded(j.right))
+
+        // A special case for Generate. When we put UDTF in project list, followed by WHERE, e.g.
+        // SELECT EXPLODE(arr) FROM tbl WHERE id > 1, the Filter operator will be under Generate
+        // operator and we need to add a sub-query between them, as it's not allowed to have a WHERE
+        // before LATERAL VIEW, e.g. "... FROM tbl WHERE id > 2 EXPLODE(arr) ..." is illegal.
+        case g @ Generate(_, _, _, _, _, f: Filter) =>
+          // Add an extra `Project` to make sure we can generate legal SQL string for sub-query,
+          // for example, Subquery -> Filter -> Table will generate "(tbl WHERE ...) AS name", which
+          // misses the SELECT part.
+          val proj = Project(f.output, f)
+          g.copy(child = addSubquery(proj))
       }
     }
 
@@ -431,13 +481,14 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     }
 
     private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
-      case _: SubqueryAlias => plan
-      case _: Filter => plan
-      case _: Join => plan
-      case _: LocalLimit => plan
-      case _: GlobalLimit => plan
-      case _: SQLTable => plan
-      case OneRowRelation => plan
+      case _: SubqueryAlias |
+           _: Filter |
+           _: Join |
+           _: LocalLimit |
+           _: GlobalLimit |
+           _: SQLTable |
+           _: Generate |
+           OneRowRelation => plan
       case _ => addSubquery(plan)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1974d1d3/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index ca46c22..f3cb6f8 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
 
 import scala.util.control.NonFatal
 
+import org.apache.spark.sql.Column
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.test.SQLTestUtils
 
@@ -45,12 +46,28 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
       .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
       .write
       .saveAsTable("parquet_t2")
+
+    def createArray(id: Column): Column = {
+      when(id % 3 === 0, lit(null)).otherwise(array('id, 'id + 1))
+    }
+
+    sqlContext
+      .range(10)
+      .select(
+        createArray('id).as("arr"),
+        array(array('id), createArray('id)).as("arr2"),
+        lit("""{"f1": "1", "f2": "2", "f3": 3}""").as("json"),
+        'id
+      )
+      .write
+      .saveAsTable("parquet_t3")
   }
 
   override protected def afterAll(): Unit = {
     sql("DROP TABLE IF EXISTS parquet_t0")
     sql("DROP TABLE IF EXISTS parquet_t1")
     sql("DROP TABLE IF EXISTS parquet_t2")
+    sql("DROP TABLE IF EXISTS parquet_t3")
     sql("DROP TABLE IF EXISTS t0")
   }
 
@@ -625,4 +642,99 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
         |HAVING MAX(a.KEY) > 0
       """.stripMargin)
   }
+
+  test("generator in project list without FROM clause") {
+    checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3))")
+    checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val")
+  }
+
+  test("generator in project list with non-referenced table") {
+    checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) FROM t0")
+    checkHiveQl("SELECT EXPLODE(ARRAY(1,2,3)) AS val FROM t0")
+  }
+
+  test("generator in project list with referenced table") {
+    checkHiveQl("SELECT EXPLODE(arr) FROM parquet_t3")
+    checkHiveQl("SELECT EXPLODE(arr) AS val FROM parquet_t3")
+  }
+
+  test("generator in project list with non-UDTF expressions") {
+    checkHiveQl("SELECT EXPLODE(arr), id FROM parquet_t3")
+    checkHiveQl("SELECT EXPLODE(arr) AS val, id as a FROM parquet_t3")
+  }
+
+  test("generator in lateral view") {
+    checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW EXPLODE(arr) exp AS val")
+    checkHiveQl("SELECT val, id FROM parquet_t3 LATERAL VIEW OUTER EXPLODE(arr) exp AS val")
+  }
+
+  test("generator in lateral view with ambiguous names") {
+    checkHiveQl(
+      """
+        |SELECT exp.id, parquet_t3.id
+        |FROM parquet_t3
+        |LATERAL VIEW EXPLODE(arr) exp AS id
+      """.stripMargin)
+    checkHiveQl(
+      """
+        |SELECT exp.id, parquet_t3.id
+        |FROM parquet_t3
+        |LATERAL VIEW OUTER EXPLODE(arr) exp AS id
+      """.stripMargin)
+  }
+
+  test("use JSON_TUPLE as generator") {
+    checkHiveQl(
+      """
+        |SELECT c0, c1, c2
+        |FROM parquet_t3
+        |LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt
+      """.stripMargin)
+    checkHiveQl(
+      """
+        |SELECT a, b, c
+        |FROM parquet_t3
+        |LATERAL VIEW JSON_TUPLE(json, 'f1', 'f2', 'f3') jt AS a, b, c
+      """.stripMargin)
+  }
+
+  test("nested generator in lateral view") {
+    checkHiveQl(
+      """
+        |SELECT val, id
+        |FROM parquet_t3
+        |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
+        |LATERAL VIEW EXPLODE(nested_array) exp1 AS val
+      """.stripMargin)
+
+    checkHiveQl(
+      """
+        |SELECT val, id
+        |FROM parquet_t3
+        |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
+        |LATERAL VIEW OUTER EXPLODE(nested_array) exp1 AS val
+      """.stripMargin)
+  }
+
+  test("generate with other operators") {
+    checkHiveQl(
+      """
+        |SELECT EXPLODE(arr) AS val, id
+        |FROM parquet_t3
+        |WHERE id > 2
+        |ORDER BY val, id
+        |LIMIT 5
+      """.stripMargin)
+
+    checkHiveQl(
+      """
+        |SELECT val, id
+        |FROM parquet_t3
+        |LATERAL VIEW EXPLODE(arr2) exp1 AS nested_array
+        |LATERAL VIEW EXPLODE(nested_array) exp1 AS val
+        |WHERE val > 2
+        |ORDER BY val, id
+        |LIMIT 5
+      """.stripMargin)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org