You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/10 05:00:59 UTC

spark git commit: [SPARK-17741][SQL] Grammar to parse top level and nested data fields separately

Repository: spark
Updated Branches:
  refs/heads/master 26fbca480 -> 16590030c


[SPARK-17741][SQL] Grammar to parse top level and nested data fields separately

## What changes were proposed in this pull request?

Currently we use the same rule to parse top level and nested data fields. For example:
```
create table tbl_x(
  id bigint,
  nested struct<col1:string,col2:string>
)
```
Shows both syntaxes. In this PR we split this rule in a top-level and nested rule.

Before this PR,
```
sql("CREATE TABLE my_tab(column1: INT)")
```
works fine.
After this PR, it will throw a `ParseException`:
```
scala> sql("CREATE TABLE my_tab(column1: INT)")
org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'CREATE TABLE my_tab(column1:'(line 1, pos 27)
```

## How was this patch tested?
Add new testcases in `SparkSqlParserSuite`.

Author: jiangxingbo <ji...@gmail.com>

Closes #15346 from jiangxb1987/cdt.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16590030
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16590030
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16590030

Branch: refs/heads/master
Commit: 16590030c15b32e83b584283697b6f783cffe043
Parents: 26fbca4
Author: jiangxingbo <ji...@gmail.com>
Authored: Sun Oct 9 22:00:54 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Oct 9 22:00:54 2016 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  12 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  32 +++-
 .../catalyst/parser/DataTypeParserSuite.scala   |  14 +-
 .../spark/sql/execution/SparkSqlParser.scala    |   4 +-
 .../sql/execution/SparkSqlParserSuite.scala     | 152 ++++++++++++++++++-
 .../spark/sql/execution/command/DDLSuite.scala  |   2 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |   2 +-
 7 files changed, 195 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6a94def..a3bbace 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -584,7 +584,7 @@ intervalValue
 dataType
     : complex=ARRAY '<' dataType '>'                            #complexDataType
     | complex=MAP '<' dataType ',' dataType '>'                 #complexDataType
-    | complex=STRUCT ('<' colTypeList? '>' | NEQ)               #complexDataType
+    | complex=STRUCT ('<' complexColTypeList? '>' | NEQ)        #complexDataType
     | identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')?  #primitiveDataType
     ;
 
@@ -593,7 +593,15 @@ colTypeList
     ;
 
 colType
-    : identifier ':'? dataType (COMMENT STRING)?
+    : identifier dataType (COMMENT STRING)?
+    ;
+
+complexColTypeList
+    : complexColType (',' complexColType)*
+    ;
+
+complexColType
+    : identifier ':' dataType (COMMENT STRING)?
     ;
 
 whenClause

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index bf3f302..929c1c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -316,7 +316,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
         // Create the attributes.
         val (attributes, schemaLess) = if (colTypeList != null) {
           // Typed return columns.
-          (createStructType(colTypeList).toAttributes, false)
+          (createSchema(colTypeList).toAttributes, false)
         } else if (identifierSeq != null) {
           // Untyped return columns.
           val attrs = visitIdentifierSeq(identifierSeq).map { name =>
@@ -1450,14 +1450,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
       case SqlBaseParser.MAP =>
         MapType(typedVisit(ctx.dataType(0)), typedVisit(ctx.dataType(1)))
       case SqlBaseParser.STRUCT =>
-        createStructType(ctx.colTypeList())
+        createStructType(ctx.complexColTypeList())
     }
   }
 
   /**
-   * Create a [[StructType]] from a sequence of [[StructField]]s.
+   * Create top level table schema.
    */
-  protected def createStructType(ctx: ColTypeListContext): StructType = {
+  protected def createSchema(ctx: ColTypeListContext): StructType = {
     StructType(Option(ctx).toSeq.flatMap(visitColTypeList))
   }
 
@@ -1476,4 +1476,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
     val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
     if (STRING == null) structField else structField.withComment(string(STRING))
   }
+
+  /**
+   * Create a [[StructType]] from a sequence of [[StructField]]s.
+   */
+  protected def createStructType(ctx: ComplexColTypeListContext): StructType = {
+    StructType(Option(ctx).toSeq.flatMap(visitComplexColTypeList))
+  }
+
+  /**
+   * Create a [[StructType]] from a number of column definitions.
+   */
+  override def visitComplexColTypeList(
+      ctx: ComplexColTypeListContext): Seq[StructField] = withOrigin(ctx) {
+    ctx.complexColType().asScala.map(visitComplexColType)
+  }
+
+  /**
+   * Create a [[StructField]] from a column definition.
+   */
+  override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
+    import ctx._
+    val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
+    if (STRING == null) structField else structField.withComment(string(STRING))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
index 020fb16..3964fa3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
@@ -116,6 +116,7 @@ class DataTypeParserSuite extends SparkFunSuite {
   unsupported("it is not a data type")
   unsupported("struct<x+y: int, 1.1:timestamp>")
   unsupported("struct<x: int")
+  unsupported("struct<x int, y string>")
 
   // DataType parser accepts certain reserved keywords.
   checkDataType(
@@ -125,16 +126,11 @@ class DataTypeParserSuite extends SparkFunSuite {
         StructField("DATE", BooleanType, true) :: Nil)
   )
 
-  // Define struct columns without ':'
-  checkDataType(
-    "struct<x int, y string>",
-    (new StructType).add("x", IntegerType).add("y", StringType))
-
-  checkDataType(
-    "struct<`x``y` int>",
-    (new StructType).add("x`y", IntegerType))
-
   // Use SQL keywords.
   checkDataType("struct<end: long, select: int, from: string>",
     (new StructType).add("end", LongType).add("select", IntegerType).add("from", StringType))
+
+  // DataType parser accepts comments.
+  checkDataType("Struct<x: INT, y: STRING COMMENT 'test'>",
+    (new StructType).add("x", IntegerType).add("y", StringType, true, "test"))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 085bb9f..5f87b71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -340,7 +340,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     if (provider.toLowerCase == "hive") {
       throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
     }
-    val schema = Option(ctx.colTypeList()).map(createStructType)
+    val schema = Option(ctx.colTypeList()).map(createSchema)
     val partitionColumnNames =
       Option(ctx.partitionColumnNames)
         .map(visitIdentifierList(_).toArray)
@@ -399,7 +399,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) {
     CreateTempViewUsing(
       tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
-      userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
+      userSpecifiedSchema = Option(ctx.colTypeList()).map(createSchema),
       replace = ctx.REPLACE != null,
       provider = ctx.tableProvider.qualifiedName.getText,
       options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 6712d32..e0976ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.sql.SaveMode
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.{DescribeFunctionCommand, DescribeTableCommand,
   ShowFunctionsCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
 
 /**
  * Parser test cases for rules defined in [[SparkSqlParser]].
@@ -35,8 +39,23 @@ class SparkSqlParserSuite extends PlanTest {
 
   private lazy val parser = new SparkSqlParser(new SQLConf)
 
+  /**
+   * Normalizes plans:
+   * - CreateTable the createTime in tableDesc will replaced by -1L.
+   */
+  private def normalizePlan(plan: LogicalPlan): LogicalPlan = {
+    plan match {
+      case CreateTable(tableDesc, mode, query) =>
+        val newTableDesc = tableDesc.copy(createTime = -1L)
+        CreateTable(newTableDesc, mode, query)
+      case _ => plan // Don't transform
+    }
+  }
+
   private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = {
-    comparePlans(parser.parsePlan(sqlCommand), plan)
+    val normalized1 = normalizePlan(parser.parsePlan(sqlCommand))
+    val normalized2 = normalizePlan(plan)
+    comparePlans(normalized1, normalized2)
   }
 
   private def intercept(sqlCommand: String, messages: String*): Unit = {
@@ -68,9 +87,134 @@ class SparkSqlParserSuite extends PlanTest {
       DescribeFunctionCommand(FunctionIdentifier("bar", database = None), isExtended = true))
     assertEqual("describe function foo.bar",
       DescribeFunctionCommand(
-        FunctionIdentifier("bar", database = Option("foo")), isExtended = false))
+        FunctionIdentifier("bar", database = Some("foo")), isExtended = false))
     assertEqual("describe function extended f.bar",
-      DescribeFunctionCommand(FunctionIdentifier("bar", database = Option("f")), isExtended = true))
+      DescribeFunctionCommand(FunctionIdentifier("bar", database = Some("f")), isExtended = true))
+  }
+
+  private def createTableUsing(
+      table: String,
+      database: Option[String] = None,
+      tableType: CatalogTableType = CatalogTableType.MANAGED,
+      storage: CatalogStorageFormat = CatalogStorageFormat.empty,
+      schema: StructType = new StructType,
+      provider: Option[String] = Some("parquet"),
+      partitionColumnNames: Seq[String] = Seq.empty,
+      bucketSpec: Option[BucketSpec] = None,
+      mode: SaveMode = SaveMode.ErrorIfExists,
+      query: Option[LogicalPlan] = None): CreateTable = {
+    CreateTable(
+      CatalogTable(
+        identifier = TableIdentifier(table, database),
+        tableType = tableType,
+        storage = storage,
+        schema = schema,
+        provider = provider,
+        partitionColumnNames = partitionColumnNames,
+        bucketSpec = bucketSpec
+      ), mode, query
+    )
+  }
+
+  private def createTempViewUsing(
+      table: String,
+      database: Option[String] = None,
+      schema: Option[StructType] = None,
+      replace: Boolean = true,
+      provider: String = "parquet",
+      options: Map[String, String] = Map.empty): LogicalPlan = {
+    CreateTempViewUsing(TableIdentifier(table, database), schema, replace, provider, options)
+  }
+
+  private def createTable(
+      table: String,
+      database: Option[String] = None,
+      tableType: CatalogTableType = CatalogTableType.MANAGED,
+      storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
+        inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
+        outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat),
+      schema: StructType = new StructType,
+      provider: Option[String] = Some("hive"),
+      partitionColumnNames: Seq[String] = Seq.empty,
+      comment: Option[String] = None,
+      mode: SaveMode = SaveMode.ErrorIfExists,
+      query: Option[LogicalPlan] = None): CreateTable = {
+    CreateTable(
+      CatalogTable(
+        identifier = TableIdentifier(table, database),
+        tableType = tableType,
+        storage = storage,
+        schema = schema,
+        provider = provider,
+        partitionColumnNames = partitionColumnNames,
+        comment = comment
+      ), mode, query
+    )
+  }
+
+  test("create table - schema") {
+    assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)",
+      createTable(
+        table = "my_tab",
+        schema = (new StructType)
+          .add("a", IntegerType, nullable = true, "test")
+          .add("b", StringType)
+      )
+    )
+    assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
+      "PARTITIONED BY (c INT, d STRING COMMENT 'test2')",
+      createTable(
+        table = "my_tab",
+        schema = (new StructType)
+          .add("a", IntegerType, nullable = true, "test")
+          .add("b", StringType)
+          .add("c", IntegerType)
+          .add("d", StringType, nullable = true, "test2"),
+        partitionColumnNames = Seq("c", "d")
+      )
+    )
+    assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>)",
+      createTable(
+        table = "my_tab",
+        schema = (new StructType)
+          .add("id", LongType)
+          .add("nested", (new StructType)
+            .add("col1", StringType)
+            .add("col2", IntegerType)
+          )
+      )
+    )
+    // Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze
+    // rule in `AnalyzeCreateTable`.
+    assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
+      "PARTITIONED BY (nested STRUCT<col1: STRING,col2: INT>)",
+      createTable(
+        table = "my_tab",
+        schema = (new StructType)
+          .add("a", IntegerType, nullable = true, "test")
+          .add("b", StringType)
+          .add("nested", (new StructType)
+            .add("col1", StringType)
+            .add("col2", IntegerType)
+          ),
+        partitionColumnNames = Seq("nested")
+      )
+    )
+    intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING)",
+      "no viable alternative at input")
+  }
+
+  test("create table using - schema") {
+    assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet",
+      createTableUsing(
+        table = "my_tab",
+        schema = (new StructType)
+          .add("a", IntegerType, nullable = true, "test")
+          .add("b", StringType)
+      )
+    )
+    intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet",
+      "no viable alternative at input")
   }
 
   test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") {

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b5499f2..1bcb810 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -642,7 +642,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     val csvFile =
       Thread.currentThread().getContextClassLoader.getResource("test-data/cars.csv").toString
     withView("testview") {
-      sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String)  USING " +
+      sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1 String, c2 String)  USING " +
         "org.apache.spark.sql.execution.datasources.csv.CSVFileFormat  " +
         s"OPTIONS (PATH '$csvFile')")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 54e27b6..9ce3338 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -243,7 +243,7 @@ class HiveDDLCommandSuite extends PlanTest {
       .asInstanceOf[ScriptTransformation].copy(ioschema = null)
     val plan2 = parser.parsePlan("map a, b using 'func' as c, d from e")
       .asInstanceOf[ScriptTransformation].copy(ioschema = null)
-    val plan3 = parser.parsePlan("reduce a, b using 'func' as (c: int, d decimal(10, 0)) from e")
+    val plan3 = parser.parsePlan("reduce a, b using 'func' as (c int, d decimal(10, 0)) from e")
       .asInstanceOf[ScriptTransformation].copy(ioschema = null)
 
     val p = ScriptTransformation(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org