You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/16 21:11:29 UTC

spark git commit: [SPARK-12721][SQL] SQL Generation for Script Transformation

Repository: spark
Updated Branches:
  refs/heads/master 1d1de28a3 -> c4bd57602


[SPARK-12721][SQL] SQL Generation for Script Transformation

#### What changes were proposed in this pull request?

This PR is to convert to SQL from analyzed logical plans containing operator `ScriptTransformation`.

For example, below is the SQL containing `Transform`
```
SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2
```

Its logical plan is like
```
ScriptTransformation [a#210L,b#211L,c#212L,d#213L], cat, [key#208,value#209], HiveScriptIOSchema(List(),List(),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),Some(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe),List((field.delim,	)),List((field.delim,	)),Some(org.apache.hadoop.hive.ql.exec.TextRecordReader),Some(org.apache.hadoop.hive.ql.exec.TextRecordWriter),true)
+- SubqueryAlias parquet_t2
   +- Relation[a#210L,b#211L,c#212L,d#213L] ParquetRelation
```

The generated SQL will be like
```
SELECT TRANSFORM (`parquet_t2`.`a`, `parquet_t2`.`b`, `parquet_t2`.`c`, `parquet_t2`.`d`) USING 'cat' AS (`key` string, `value` string) FROM `default`.`parquet_t2`
```
#### How was this patch tested?

Seven test cases are added to `LogicalPlanToSQLSuite`.

Author: gatorsmile <ga...@gmail.com>
Author: xiaoli <li...@gmail.com>
Author: Xiao Li <xi...@Xiaos-MacBook-Pro.local>

Closes #11503 from gatorsmile/transformToSQL.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c4bd5760
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c4bd5760
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c4bd5760

Branch: refs/heads/master
Commit: c4bd57602c0b14188d364bb475631bf473d25082
Parents: 1d1de28
Author: gatorsmile <ga...@gmail.com>
Authored: Wed Mar 16 13:11:11 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 13:11:11 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 29 ++++++++++
 .../hive/execution/ScriptTransformation.scala   | 48 +++++++++++++++++
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  | 57 ++++++++++++++++++++
 3 files changed, 134 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 3bc8e9a..f3446a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.execution.HiveScriptIOSchema
 import org.apache.spark.sql.types.{ByteType, DataType, IntegerType, NullType}
 
 /**
@@ -184,6 +185,9 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         p.partitionExpressions.map(_.sql).mkString(", ")
       )
 
+    case p: ScriptTransformation =>
+      scriptTransformationToSQL(p)
+
     case OneRowRelation =>
       ""
 
@@ -209,6 +213,31 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     )
   }
 
+  private def scriptTransformationToSQL(plan: ScriptTransformation): String = {
+    val ioSchema = plan.ioschema.asInstanceOf[HiveScriptIOSchema]
+    val inputRowFormatSQL = ioSchema.inputRowFormatSQL.getOrElse(
+      throw new UnsupportedOperationException(
+        s"unsupported row format ${ioSchema.inputRowFormat}"))
+    val outputRowFormatSQL = ioSchema.outputRowFormatSQL.getOrElse(
+      throw new UnsupportedOperationException(
+        s"unsupported row format ${ioSchema.outputRowFormat}"))
+
+    val outputSchema = plan.output.map { attr =>
+      s"${attr.sql} ${attr.dataType.simpleString}"
+    }.mkString(", ")
+
+    build(
+      "SELECT TRANSFORM",
+      "(" + plan.input.map(_.sql).mkString(", ") + ")",
+      inputRowFormatSQL,
+      s"USING \'${plan.script}\'",
+      "AS (" + outputSchema + ")",
+      outputRowFormatSQL,
+      if (plan.child == OneRowRelation) "" else "FROM",
+      toSQL(plan.child)
+    )
+  }
+
   private def aggregateToSQL(plan: Aggregate): String = {
     val groupingSQL = plan.groupingExpressions.map(_.sql).mkString(", ")
     build(

http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 3b53716..62e7c12 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -400,4 +400,52 @@ case class HiveScriptIOSchema (
       instance
     }
   }
+
+  def inputRowFormatSQL: Option[String] =
+    getRowFormatSQL(inputRowFormat, inputSerdeClass, inputSerdeProps)
+
+  def outputRowFormatSQL: Option[String] =
+    getRowFormatSQL(outputRowFormat, outputSerdeClass, outputSerdeProps)
+
+  /**
+   * Get the row format specification
+   * Note:
+   * 1. Changes are needed when readerClause and writerClause are supported.
+   * 2. Changes are needed when "ESCAPED BY" is supported.
+   */
+  private def getRowFormatSQL(
+      rowFormat: Seq[(String, String)],
+      serdeClass: Option[String],
+      serdeProps: Seq[(String, String)]): Option[String] = {
+    if (schemaLess) return Some("")
+
+    val rowFormatDelimited =
+      rowFormat.map {
+        case ("TOK_TABLEROWFORMATFIELD", value) =>
+          "FIELDS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATCOLLITEMS", value) =>
+          "COLLECTION ITEMS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATMAPKEYS", value) =>
+          "MAP KEYS TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATLINES", value) =>
+          "LINES TERMINATED BY " + value
+        case ("TOK_TABLEROWFORMATNULL", value) =>
+          "NULL DEFINED AS " + value
+        case o => return None
+      }
+
+    val serdeClassSQL = serdeClass.map("'" + _ + "'").getOrElse("")
+    val serdePropsSQL =
+      if (serdeClass.nonEmpty) {
+        val props = serdeProps.map{p => s"'${p._1}' = '${p._2}'"}.mkString(", ")
+        if (props.nonEmpty) " WITH SERDEPROPERTIES(" + props + ")" else ""
+      } else {
+        ""
+      }
+    if (rowFormat.nonEmpty) {
+      Some("ROW FORMAT DELIMITED " + rowFormatDelimited.mkString(" "))
+    } else {
+      Some("ROW FORMAT SERDE " + serdeClassSQL + serdePropsSQL)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c4bd5760/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index f02ecb4..ca46c22 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -383,6 +383,63 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
     }
   }
 
+  test("script transformation - schemaless") {
+    checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' FROM parquet_t2")
+    checkHiveQl("SELECT TRANSFORM (*) USING 'cat' FROM parquet_t2")
+  }
+
+  test("script transformation - alias list") {
+    checkHiveQl("SELECT TRANSFORM (a, b, c, d) USING 'cat' AS (d1, d2, d3, d4) FROM parquet_t2")
+  }
+
+  test("script transformation - alias list with type") {
+    checkHiveQl(
+      """FROM
+        |(FROM parquet_t1 SELECT TRANSFORM(key, value) USING 'cat' AS (thing1 int, thing2 string)) t
+        |SELECT thing1 + 1
+      """.stripMargin)
+  }
+
+  test("script transformation - row format delimited clause with only one format property") {
+    checkHiveQl(
+      """SELECT TRANSFORM (key) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+        |USING 'cat' AS (tKey) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
+        |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("script transformation - row format delimited clause with multiple format properties") {
+    checkHiveQl(
+      """SELECT TRANSFORM (key)
+        |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t'
+        |USING 'cat' AS (tKey)
+        |ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\t'
+        |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("script transformation - row format serde clauses with SERDEPROPERTIES") {
+    checkHiveQl(
+      """SELECT TRANSFORM (key, value)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+        |WITH SERDEPROPERTIES('field.delim' = '|')
+        |USING 'cat' AS (tKey, tValue)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+        |WITH SERDEPROPERTIES('field.delim' = '|')
+        |FROM parquet_t1
+      """.stripMargin)
+  }
+
+  test("script transformation - row format serde clauses without SERDEPROPERTIES") {
+    checkHiveQl(
+      """SELECT TRANSFORM (key, value)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+        |USING 'cat' AS (tKey, tValue)
+        |ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
+        |FROM parquet_t1
+      """.stripMargin)
+  }
+
   test("plans with non-SQL expressions") {
     sqlContext.udf.register("foo", (_: Int) * 2)
     intercept[UnsupportedOperationException](new SQLBuilder(sql("SELECT foo(id) FROM t0")).toSQL)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org