You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/10/10 05:00:59 UTC
spark git commit: [SPARK-17741][SQL] Grammar to parse top level and
nested data fields separately
Repository: spark
Updated Branches:
refs/heads/master 26fbca480 -> 16590030c
[SPARK-17741][SQL] Grammar to parse top level and nested data fields separately
## What changes were proposed in this pull request?
Currently we use the same rule to parse top level and nested data fields. For example:
```
create table tbl_x(
id bigint,
nested struct<col1:string,col2:string>
)
```
Shows both syntaxes. In this PR we split this rule in a top-level and nested rule.
Before this PR,
```
sql("CREATE TABLE my_tab(column1: INT)")
```
works fine.
After this PR, it will throw a `ParseException`:
```
scala> sql("CREATE TABLE my_tab(column1: INT)")
org.apache.spark.sql.catalyst.parser.ParseException:
no viable alternative at input 'CREATE TABLE my_tab(column1:'(line 1, pos 27)
```
## How was this patch tested?
Add new testcases in `SparkSqlParserSuite`.
Author: jiangxingbo <ji...@gmail.com>
Closes #15346 from jiangxb1987/cdt.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16590030
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16590030
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16590030
Branch: refs/heads/master
Commit: 16590030c15b32e83b584283697b6f783cffe043
Parents: 26fbca4
Author: jiangxingbo <ji...@gmail.com>
Authored: Sun Oct 9 22:00:54 2016 -0700
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Sun Oct 9 22:00:54 2016 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 12 +-
.../spark/sql/catalyst/parser/AstBuilder.scala | 32 +++-
.../catalyst/parser/DataTypeParserSuite.scala | 14 +-
.../spark/sql/execution/SparkSqlParser.scala | 4 +-
.../sql/execution/SparkSqlParserSuite.scala | 152 ++++++++++++++++++-
.../spark/sql/execution/command/DDLSuite.scala | 2 +-
.../spark/sql/hive/HiveDDLCommandSuite.scala | 2 +-
7 files changed, 195 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 6a94def..a3bbace 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -584,7 +584,7 @@ intervalValue
dataType
: complex=ARRAY '<' dataType '>' #complexDataType
| complex=MAP '<' dataType ',' dataType '>' #complexDataType
- | complex=STRUCT ('<' colTypeList? '>' | NEQ) #complexDataType
+ | complex=STRUCT ('<' complexColTypeList? '>' | NEQ) #complexDataType
| identifier ('(' INTEGER_VALUE (',' INTEGER_VALUE)* ')')? #primitiveDataType
;
@@ -593,7 +593,15 @@ colTypeList
;
colType
- : identifier ':'? dataType (COMMENT STRING)?
+ : identifier dataType (COMMENT STRING)?
+ ;
+
+complexColTypeList
+ : complexColType (',' complexColType)*
+ ;
+
+complexColType
+ : identifier ':' dataType (COMMENT STRING)?
;
whenClause
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index bf3f302..929c1c4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -316,7 +316,7 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
// Create the attributes.
val (attributes, schemaLess) = if (colTypeList != null) {
// Typed return columns.
- (createStructType(colTypeList).toAttributes, false)
+ (createSchema(colTypeList).toAttributes, false)
} else if (identifierSeq != null) {
// Untyped return columns.
val attrs = visitIdentifierSeq(identifierSeq).map { name =>
@@ -1450,14 +1450,14 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
case SqlBaseParser.MAP =>
MapType(typedVisit(ctx.dataType(0)), typedVisit(ctx.dataType(1)))
case SqlBaseParser.STRUCT =>
- createStructType(ctx.colTypeList())
+ createStructType(ctx.complexColTypeList())
}
}
/**
- * Create a [[StructType]] from a sequence of [[StructField]]s.
+ * Create top level table schema.
*/
- protected def createStructType(ctx: ColTypeListContext): StructType = {
+ protected def createSchema(ctx: ColTypeListContext): StructType = {
StructType(Option(ctx).toSeq.flatMap(visitColTypeList))
}
@@ -1476,4 +1476,28 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
if (STRING == null) structField else structField.withComment(string(STRING))
}
+
+ /**
+ * Create a [[StructType]] from a sequence of [[StructField]]s.
+ */
+ protected def createStructType(ctx: ComplexColTypeListContext): StructType = {
+ StructType(Option(ctx).toSeq.flatMap(visitComplexColTypeList))
+ }
+
+ /**
+ * Create a [[StructType]] from a number of column definitions.
+ */
+ override def visitComplexColTypeList(
+ ctx: ComplexColTypeListContext): Seq[StructField] = withOrigin(ctx) {
+ ctx.complexColType().asScala.map(visitComplexColType)
+ }
+
+ /**
+ * Create a [[StructField]] from a column definition.
+ */
+ override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
+ import ctx._
+ val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
+ if (STRING == null) structField else structField.withComment(string(STRING))
+ }
}
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
index 020fb16..3964fa3 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DataTypeParserSuite.scala
@@ -116,6 +116,7 @@ class DataTypeParserSuite extends SparkFunSuite {
unsupported("it is not a data type")
unsupported("struct<x+y: int, 1.1:timestamp>")
unsupported("struct<x: int")
+ unsupported("struct<x int, y string>")
// DataType parser accepts certain reserved keywords.
checkDataType(
@@ -125,16 +126,11 @@ class DataTypeParserSuite extends SparkFunSuite {
StructField("DATE", BooleanType, true) :: Nil)
)
- // Define struct columns without ':'
- checkDataType(
- "struct<x int, y string>",
- (new StructType).add("x", IntegerType).add("y", StringType))
-
- checkDataType(
- "struct<`x``y` int>",
- (new StructType).add("x`y", IntegerType))
-
// Use SQL keywords.
checkDataType("struct<end: long, select: int, from: string>",
(new StructType).add("end", LongType).add("select", IntegerType).add("from", StringType))
+
+ // DataType parser accepts comments.
+ checkDataType("Struct<x: INT, y: STRING COMMENT 'test'>",
+ (new StructType).add("x", IntegerType).add("y", StringType, true, "test"))
}
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 085bb9f..5f87b71 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -340,7 +340,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (provider.toLowerCase == "hive") {
throw new AnalysisException("Cannot create hive serde table with CREATE TABLE USING")
}
- val schema = Option(ctx.colTypeList()).map(createStructType)
+ val schema = Option(ctx.colTypeList()).map(createSchema)
val partitionColumnNames =
Option(ctx.partitionColumnNames)
.map(visitIdentifierList(_).toArray)
@@ -399,7 +399,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx: CreateTempViewUsingContext): LogicalPlan = withOrigin(ctx) {
CreateTempViewUsing(
tableIdent = visitTableIdentifier(ctx.tableIdentifier()),
- userSpecifiedSchema = Option(ctx.colTypeList()).map(createStructType),
+ userSpecifiedSchema = Option(ctx.colTypeList()).map(createSchema),
replace = ctx.REPLACE != null,
provider = ctx.tableProvider.qualifiedName.getText,
options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 6712d32..e0976ae 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -17,13 +17,17 @@
package org.apache.spark.sql.execution
+import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command.{DescribeFunctionCommand, DescribeTableCommand,
ShowFunctionsCommand}
-import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing}
+import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
/**
* Parser test cases for rules defined in [[SparkSqlParser]].
@@ -35,8 +39,23 @@ class SparkSqlParserSuite extends PlanTest {
private lazy val parser = new SparkSqlParser(new SQLConf)
+ /**
+ * Normalizes plans:
+ * - CreateTable the createTime in tableDesc will replaced by -1L.
+ */
+ private def normalizePlan(plan: LogicalPlan): LogicalPlan = {
+ plan match {
+ case CreateTable(tableDesc, mode, query) =>
+ val newTableDesc = tableDesc.copy(createTime = -1L)
+ CreateTable(newTableDesc, mode, query)
+ case _ => plan // Don't transform
+ }
+ }
+
private def assertEqual(sqlCommand: String, plan: LogicalPlan): Unit = {
- comparePlans(parser.parsePlan(sqlCommand), plan)
+ val normalized1 = normalizePlan(parser.parsePlan(sqlCommand))
+ val normalized2 = normalizePlan(plan)
+ comparePlans(normalized1, normalized2)
}
private def intercept(sqlCommand: String, messages: String*): Unit = {
@@ -68,9 +87,134 @@ class SparkSqlParserSuite extends PlanTest {
DescribeFunctionCommand(FunctionIdentifier("bar", database = None), isExtended = true))
assertEqual("describe function foo.bar",
DescribeFunctionCommand(
- FunctionIdentifier("bar", database = Option("foo")), isExtended = false))
+ FunctionIdentifier("bar", database = Some("foo")), isExtended = false))
assertEqual("describe function extended f.bar",
- DescribeFunctionCommand(FunctionIdentifier("bar", database = Option("f")), isExtended = true))
+ DescribeFunctionCommand(FunctionIdentifier("bar", database = Some("f")), isExtended = true))
+ }
+
+ private def createTableUsing(
+ table: String,
+ database: Option[String] = None,
+ tableType: CatalogTableType = CatalogTableType.MANAGED,
+ storage: CatalogStorageFormat = CatalogStorageFormat.empty,
+ schema: StructType = new StructType,
+ provider: Option[String] = Some("parquet"),
+ partitionColumnNames: Seq[String] = Seq.empty,
+ bucketSpec: Option[BucketSpec] = None,
+ mode: SaveMode = SaveMode.ErrorIfExists,
+ query: Option[LogicalPlan] = None): CreateTable = {
+ CreateTable(
+ CatalogTable(
+ identifier = TableIdentifier(table, database),
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ provider = provider,
+ partitionColumnNames = partitionColumnNames,
+ bucketSpec = bucketSpec
+ ), mode, query
+ )
+ }
+
+ private def createTempViewUsing(
+ table: String,
+ database: Option[String] = None,
+ schema: Option[StructType] = None,
+ replace: Boolean = true,
+ provider: String = "parquet",
+ options: Map[String, String] = Map.empty): LogicalPlan = {
+ CreateTempViewUsing(TableIdentifier(table, database), schema, replace, provider, options)
+ }
+
+ private def createTable(
+ table: String,
+ database: Option[String] = None,
+ tableType: CatalogTableType = CatalogTableType.MANAGED,
+ storage: CatalogStorageFormat = CatalogStorageFormat.empty.copy(
+ inputFormat = HiveSerDe.sourceToSerDe("textfile").get.inputFormat,
+ outputFormat = HiveSerDe.sourceToSerDe("textfile").get.outputFormat),
+ schema: StructType = new StructType,
+ provider: Option[String] = Some("hive"),
+ partitionColumnNames: Seq[String] = Seq.empty,
+ comment: Option[String] = None,
+ mode: SaveMode = SaveMode.ErrorIfExists,
+ query: Option[LogicalPlan] = None): CreateTable = {
+ CreateTable(
+ CatalogTable(
+ identifier = TableIdentifier(table, database),
+ tableType = tableType,
+ storage = storage,
+ schema = schema,
+ provider = provider,
+ partitionColumnNames = partitionColumnNames,
+ comment = comment
+ ), mode, query
+ )
+ }
+
+ test("create table - schema") {
+ assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING)",
+ createTable(
+ table = "my_tab",
+ schema = (new StructType)
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)
+ )
+ )
+ assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
+ "PARTITIONED BY (c INT, d STRING COMMENT 'test2')",
+ createTable(
+ table = "my_tab",
+ schema = (new StructType)
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)
+ .add("c", IntegerType)
+ .add("d", StringType, nullable = true, "test2"),
+ partitionColumnNames = Seq("c", "d")
+ )
+ )
+ assertEqual("CREATE TABLE my_tab(id BIGINT, nested STRUCT<col1: STRING,col2: INT>)",
+ createTable(
+ table = "my_tab",
+ schema = (new StructType)
+ .add("id", LongType)
+ .add("nested", (new StructType)
+ .add("col1", StringType)
+ .add("col2", IntegerType)
+ )
+ )
+ )
+ // Partitioned by a StructType should be accepted by `SparkSqlParser` but will fail an analyze
+ // rule in `AnalyzeCreateTable`.
+ assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) " +
+ "PARTITIONED BY (nested STRUCT<col1: STRING,col2: INT>)",
+ createTable(
+ table = "my_tab",
+ schema = (new StructType)
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)
+ .add("nested", (new StructType)
+ .add("col1", StringType)
+ .add("col2", IntegerType)
+ ),
+ partitionColumnNames = Seq("nested")
+ )
+ )
+ intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING)",
+ "no viable alternative at input")
+ }
+
+ test("create table using - schema") {
+ assertEqual("CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet",
+ createTableUsing(
+ table = "my_tab",
+ schema = (new StructType)
+ .add("a", IntegerType, nullable = true, "test")
+ .add("b", StringType)
+ )
+ )
+ intercept("CREATE TABLE my_tab(a: INT COMMENT 'test', b: STRING) USING parquet",
+ "no viable alternative at input")
}
test("SPARK-17328 Fix NPE with EXPLAIN DESCRIBE TABLE") {
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index b5499f2..1bcb810 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -642,7 +642,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val csvFile =
Thread.currentThread().getContextClassLoader.getResource("test-data/cars.csv").toString
withView("testview") {
- sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1: String, c2: String) USING " +
+ sql(s"CREATE OR REPLACE TEMPORARY VIEW testview (c1 String, c2 String) USING " +
"org.apache.spark.sql.execution.datasources.csv.CSVFileFormat " +
s"OPTIONS (PATH '$csvFile')")
http://git-wip-us.apache.org/repos/asf/spark/blob/16590030/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index 54e27b6..9ce3338 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -243,7 +243,7 @@ class HiveDDLCommandSuite extends PlanTest {
.asInstanceOf[ScriptTransformation].copy(ioschema = null)
val plan2 = parser.parsePlan("map a, b using 'func' as c, d from e")
.asInstanceOf[ScriptTransformation].copy(ioschema = null)
- val plan3 = parser.parsePlan("reduce a, b using 'func' as (c: int, d decimal(10, 0)) from e")
+ val plan3 = parser.parsePlan("reduce a, b using 'func' as (c int, d decimal(10, 0)) from e")
.asInstanceOf[ScriptTransformation].copy(ioschema = null)
val p = ScriptTransformation(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org