You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/03/07 09:06:26 UTC

[spark] branch master updated: [SPARK-38335][SQL] Implement parser support for DEFAULT column values

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e21cb62  [SPARK-38335][SQL] Implement parser support for DEFAULT column values
e21cb62 is described below

commit e21cb62d02c85a66771822cdd49c49dbb3e44502
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Mon Mar 7 17:05:19 2022 +0800

    [SPARK-38335][SQL] Implement parser support for DEFAULT column values
    
    ### What changes were proposed in this pull request?
    
    Implement parser changes needed to support for DEFAULT column values as tracked in https://issues.apache.org/jira/browse/SPARK-38334.
    
    Note that these are the parser changes only. Analysis support will take place in a following PR.
    
    Background: in the future, CREATE TABLE and ALTER TABLE invocations will support setting column default values for later operations. Following INSERT, UPDATE, MERGE statements may then reference the value using the DEFAULT keyword as needed.
    
    Examples:
    ```sql
    CREATE TABLE T(a INT, b INT NOT NULL);
    
    -- The default default is NULL
    INSERT INTO T VALUES (DEFAULT, 0);
    INSERT INTO T(b)  VALUES (1);
    SELECT * FROM T;
    (NULL, 0)
    (NULL, 1)
    
    -- Adding a default to a table with rows, sets the values for the
    -- existing rows (exist default) and new rows (current default).
    ALTER TABLE T ADD COLUMN c INT DEFAULT 5;
    INSERT INTO T VALUES (1, 2, DEFAULT);
    SELECT * FROM T;
    (NULL, 0, 5)
    (NULL, 1, 5)
    (1, 2, 5)
    ```
    
    ### Why are the changes needed?
    
    This new API helps users run DDL and DML statements to add new values to tables and scan existing values from tables more easily.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit test coverage in DDLParserSuite.scala
    
    Closes #35690 from dtenedor/default-cols-parsing.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 docs/sql-ref-ansi-compliance.md                    |  1 +
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |  1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     | 22 ++++++--
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 61 +++++++++++++++++++++-
 .../spark/sql/errors/QueryParsingErrors.scala      |  4 ++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 53 +++++++++++++++++++
 .../spark/sql/execution/SparkSqlParser.scala       |  2 +-
 7 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 7646206..5b18719 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -396,6 +396,7 @@ Below is a list of all the keywords in Spark SQL.
 |DATE_DIFF|non-reserved|non-reserved|non-reserved|
 |DAY|non-reserved|non-reserved|non-reserved|
 |DBPROPERTIES|non-reserved|non-reserved|non-reserved|
+|DEFAULT|non-reserved|non-reserved|non-reserved|
 |DEFINED|non-reserved|non-reserved|non-reserved|
 |DELETE|non-reserved|non-reserved|reserved|
 |DELIMITED|non-reserved|non-reserved|non-reserved|
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 18bb19e..a6e1b65 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -147,6 +147,7 @@ DATE_ADD: 'DATE_ADD';
 DATEDIFF: 'DATEDIFF';
 DATE_DIFF: 'DATE_DIFF';
 DBPROPERTIES: 'DBPROPERTIES';
+DEFAULT: 'DEFAULT';
 DEFINED: 'DEFINED';
 DELETE: 'DELETE';
 DELIMITED: 'DELIMITED';
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 76a0033..b3b8347 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -83,7 +83,7 @@ statement
         (RESTRICT | CASCADE)?                                          #dropNamespace
     | SHOW namespaces ((FROM | IN) multipartIdentifier)?
         (LIKE? pattern=STRING)?                                        #showNamespaces
-    | createTableHeader (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider?
+    | createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
         createTableClauses
         (AS? query)?                                                   #createTable
     | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
@@ -93,7 +93,7 @@ statement
         createFileFormat |
         locationSpec |
         (TBLPROPERTIES tableProps=propertyList))*                      #createTableLike
-    | replaceTableHeader (LEFT_PAREN colTypeList RIGHT_PAREN)? tableProvider?
+    | replaceTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
         createTableClauses
         (AS? query)?                                                   #replaceTable
     | ANALYZE TABLE multipartIdentifier partitionSpec? COMPUTE STATISTICS
@@ -911,7 +911,11 @@ qualifiedColTypeWithPositionList
     ;
 
 qualifiedColTypeWithPosition
-    : name=multipartIdentifier dataType (NOT NULL)? commentSpec? colPosition?
+    : name=multipartIdentifier dataType (NOT NULL)? defaultExpression? commentSpec? colPosition?
+    ;
+
+defaultExpression
+    : DEFAULT expression
     ;
 
 colTypeList
@@ -922,6 +926,14 @@ colType
     : colName=errorCapturingIdentifier dataType (NOT NULL)? commentSpec?
     ;
 
+createOrReplaceTableColTypeList
+    : createOrReplaceTableColType (COMMA createOrReplaceTableColType)*
+    ;
+
+createOrReplaceTableColType
+    : colName=errorCapturingIdentifier dataType (NOT NULL)? defaultExpression? commentSpec?
+    ;
+
 complexColTypeList
     : complexColType (COMMA complexColType)*
     ;
@@ -1028,6 +1040,8 @@ alterColumnAction
     | commentSpec
     | colPosition
     | setOrDrop=(SET | DROP) NOT NULL
+    | SET defaultExpression
+    | dropDefault=DROP DEFAULT
     ;
 
 
@@ -1086,6 +1100,7 @@ ansiNonReserved
     | DATE_DIFF
     | DAY
     | DBPROPERTIES
+    | DEFAULT
     | DEFINED
     | DELETE
     | DELIMITED
@@ -1338,6 +1353,7 @@ nonReserved
     | DATE_DIFF
     | DAY
     | DBPROPERTIES
+    | DEFAULT
     | DEFINED
     | DELETE
     | DELIMITED
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 64d5486..4619a3f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2729,6 +2729,13 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   }
 
   /**
+   * Create top level table schema.
+   */
+  protected def createSchema(ctx: CreateOrReplaceTableColTypeListContext): StructType = {
+    StructType(Option(ctx).toSeq.flatMap(visitCreateOrReplaceTableColTypeList))
+  }
+
+  /**
    * Create a [[StructType]] from a number of column definitions.
    */
   override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) {
@@ -2755,6 +2762,41 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   }
 
   /**
+   * Create a [[StructType]] from a number of CREATE TABLE column definitions.
+   */
+  override def visitCreateOrReplaceTableColTypeList(
+      ctx: CreateOrReplaceTableColTypeListContext): Seq[StructField] = withOrigin(ctx) {
+    ctx.createOrReplaceTableColType().asScala.map(visitCreateOrReplaceTableColType).toSeq
+  }
+
+  /**
+   * Create a top level [[StructField]] from a CREATE TABLE column definition.
+   */
+  override def visitCreateOrReplaceTableColType(
+      ctx: CreateOrReplaceTableColTypeContext): StructField = withOrigin(ctx) {
+    import ctx._
+
+    val builder = new MetadataBuilder
+    // Add comment to metadata
+    Option(commentSpec()).map(visitCommentSpec).foreach {
+      builder.putString("comment", _)
+    }
+
+    // Process the 'DEFAULT expression' clause in the column definition, if any.
+    val name: String = colName.getText
+    val defaultExpr = Option(ctx.defaultExpression()).map(visitDefaultExpression)
+    if (defaultExpr.isDefined) {
+      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+    }
+
+    StructField(
+      name = name,
+      dataType = typedVisit[DataType](ctx.dataType),
+      nullable = NULL == null,
+      metadata = builder.build())
+  }
+
+  /**
    * Create a [[StructType]] from a sequence of [[StructField]]s.
    */
   protected def createStructType(ctx: ComplexColTypeListContext): StructType = {
@@ -3457,7 +3499,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
 
-    val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
+    val columns = Option(ctx.createOrReplaceTableColTypeList())
+      .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
     val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
     val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
       visitCreateTableClauses(ctx.createTableClauses())
@@ -3536,7 +3579,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     val orCreate = ctx.replaceTableHeader().CREATE() != null
     val (partTransforms, partCols, bucketSpec, properties, options, location, comment, serdeInfo) =
       visitCreateTableClauses(ctx.createTableClauses())
-    val columns = Option(ctx.colTypeList()).map(visitColTypeList).getOrElse(Nil)
+    val columns = Option(ctx.createOrReplaceTableColTypeList())
+      .map(visitCreateOrReplaceTableColTypeList).getOrElse(Nil)
     val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText)
 
     if (provider.isDefined && serdeInfo.isDefined) {
@@ -3655,6 +3699,10 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
   override def visitQualifiedColTypeWithPosition(
       ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
     val name = typedVisit[Seq[String]](ctx.name)
+    val defaultExpr = Option(ctx.defaultExpression()).map(visitDefaultExpression)
+    if (defaultExpr.isDefined) {
+      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+    }
     QualifiedColType(
       path = if (name.length > 1) Some(UnresolvedFieldName(name.init)) else None,
       colName = name.last,
@@ -3743,6 +3791,12 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     } else {
       None
     }
+    if (action.defaultExpression != null) {
+      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+    }
+    if (action.dropDefault != null) {
+      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+    }
 
     assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
 
@@ -3811,6 +3865,9 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
           throw QueryParsingErrors.operationInHiveStyleCommandUnsupportedError(
             "Replacing with a nested column", "REPLACE COLUMNS", ctx)
         }
+        if (Option(colType.defaultExpression()).map(visitDefaultExpression).isDefined) {
+          throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+        }
         col
       }.toSeq
     )
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
index 6d7ed7b..96bcc18 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryParsingErrors.scala
@@ -436,4 +436,8 @@ object QueryParsingErrors {
     new ParseException(
       s"DROP TEMPORARY FUNCTION requires a single part name but got: ${name.quoted}", ctx)
   }
+
+  def defaultColumnNotImplementedYetError(ctx: ParserRuleContext): Throwable = {
+    new ParseException("Support for DEFAULT column values is not implemented yet", ctx)
+  }
 }
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 956b70a..ebd9ac8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -2235,4 +2235,57 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(parsePlan(timestampTypeSql), insertPartitionPlan(timestamp))
     comparePlans(parsePlan(binaryTypeSql), insertPartitionPlan(binaryStr))
   }
+
+  test("SPARK-38335: Implement parser support for DEFAULT values for columns in tables") {
+    // The following commands will support DEFAULT columns, but this has not been implemented yet.
+    for (sql <- Seq(
+      "ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42",
+      "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42",
+      "ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT",
+      "ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)",
+      "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT \"abc\") USING parquet",
+      "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL DEFAULT \"xyz\") USING parquet"
+    )) {
+      val exc = intercept[ParseException] {
+        parsePlan(sql);
+      }
+      assert(exc.getMessage.contains("Support for DEFAULT column values is not implemented yet"));
+    }
+    // In each of the following cases, the DEFAULT reference parses as an unresolved attribute
+    // reference. We can handle these cases after the parsing stage, at later phases of analysis.
+    comparePlans(parsePlan("VALUES (1, 2, DEFAULT) AS val"),
+      SubqueryAlias("val",
+        UnresolvedInlineTable(Seq("col1", "col2", "col3"), Seq(Seq(Literal(1), Literal(2),
+          UnresolvedAttribute("DEFAULT"))))))
+    comparePlans(parsePlan(
+      "INSERT INTO t PARTITION(part = date'2019-01-02') VALUES ('a', DEFAULT)"),
+      InsertIntoStatement(
+        UnresolvedRelation(Seq("t")),
+        Map("part" -> Some("2019-01-02")),
+        userSpecifiedCols = Seq.empty[String],
+        query = UnresolvedInlineTable(Seq("col1", "col2"), Seq(Seq(Literal("a"),
+          UnresolvedAttribute("DEFAULT")))),
+        overwrite = false, ifPartitionNotExists = false))
+    parseCompare(
+      """
+        |MERGE INTO testcat1.ns1.ns2.tbl AS target
+        |USING testcat2.ns1.ns2.tbl AS source
+        |ON target.col1 = source.col1
+        |WHEN MATCHED AND (target.col2='delete') THEN DELETE
+        |WHEN MATCHED AND (target.col2='update') THEN UPDATE SET target.col2 = DEFAULT
+        |WHEN NOT MATCHED AND (target.col2='insert')
+        |THEN INSERT (target.col1, target.col2) VALUES (source.col1, DEFAULT)
+      """.stripMargin,
+      MergeIntoTable(
+        SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
+        SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))),
+        EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
+        Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))),
+          UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update"))),
+            Seq(Assignment(UnresolvedAttribute("target.col2"),
+              UnresolvedAttribute("DEFAULT"))))),
+        Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert"))),
+          Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
+            Assignment(UnresolvedAttribute("target.col2"), UnresolvedAttribute("DEFAULT")))))))
+  }
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fed02dd..a4e72e0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -318,7 +318,7 @@ class SparkSqlAstBuilder extends AstBuilder {
       val (_, _, _, _, options, location, _, _) = visitCreateTableClauses(ctx.createTableClauses())
       val provider = Option(ctx.tableProvider).map(_.multipartIdentifier.getText).getOrElse(
         throw QueryParsingErrors.createTempTableNotSpecifyProviderError(ctx))
-      val schema = Option(ctx.colTypeList()).map(createSchema)
+      val schema = Option(ctx.createOrReplaceTableColTypeList()).map(createSchema)
 
       logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
           "CREATE TEMPORARY VIEW ... USING ... instead")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org