You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by gu...@apache.org on 2020/01/10 01:35:15 UTC

[spark] branch master updated: [SPARK-30439][SQL] Support non-nullable column in CREATE TABLE, ADD COLUMN and ALTER TABLE

This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ec03556 [SPARK-30439][SQL] Support non-nullable column in CREATE TABLE, ADD COLUMN and ALTER TABLE
0ec03556 is described below

commit 0ec0355611e7ce79599f86862a90611f7cde6227
Author: Wenchen Fan <we...@databricks.com>
AuthorDate: Fri Jan 10 10:34:46 2020 +0900

    [SPARK-30439][SQL] Support non-nullable column in CREATE TABLE, ADD COLUMN and ALTER TABLE
    
    ### What changes were proposed in this pull request?
    
    Allow users to specify NOT NULL in CREATE TABLE and ADD COLUMN column definition, and add a new SQL syntax to alter column nullability: ALTER TABLE ... ALTER COLUMN SET/DROP NOT NULL. This is a SQL standard syntax:
    ```
    <alter column definition> ::=
      ALTER [ COLUMN ] <column name> <alter column action>
    
    <alter column action> ::=
        <set column default clause>
      | <drop column default clause>
      | <set column not null clause>
      | <drop column not null clause>
      | ...
    
    <set column not null clause> ::=
      SET NOT NULL
    
    <drop column not null clause> ::=
      DROP NOT NULL
    ```
    
    ### Why are the changes needed?
    
    Previously we don't support it because the table schema in hive catalog are always nullable. Since we have catalog plugin now, it makes more sense to support NOT NULL at spark side, and let catalog implementations to decide if they support it or not.
    
    ### Does this PR introduce any user-facing change?
    
    Yes, this is a new feature
    
    ### How was this patch tested?
    
    new tests
    
    Closes #27110 from cloud-fan/nullable.
    
    Authored-by: Wenchen Fan <we...@databricks.com>
    Signed-off-by: HyukjinKwon <gu...@apache.org>
---
 .../apache/spark/sql/catalyst/parser/SqlBase.g4    |  9 ++-
 .../spark/sql/connector/catalog/TableChange.java   | 69 ++++++++++++++++------
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  9 ++-
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 25 ++++----
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 65 +++++++++++++++-----
 .../sql/catalyst/plans/logical/statements.scala    |  2 +
 .../sql/connector/catalog/CatalogV2Util.scala      | 11 ++--
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 68 ++++++++++++++++-----
 .../sql/connector/catalog/TableCatalogSuite.scala  | 21 +------
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 55 ++++++++++-------
 .../spark/sql/connector/AlterTableTests.scala      | 39 +++++++++++-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 12 ++--
 .../datasources/v2/V2SessionCatalogSuite.scala     | 21 +------
 13 files changed, 273 insertions(+), 133 deletions(-)

diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 751c782..287227e 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -161,6 +161,9 @@ statement
     | ALTER TABLE table=multipartIdentifier
         (ALTER | CHANGE) COLUMN? column=multipartIdentifier
         (TYPE dataType)? commentSpec? colPosition?                     #alterTableColumn
+    | ALTER TABLE table=multipartIdentifier
+        ALTER COLUMN? column=multipartIdentifier
+        setOrDrop=(SET | DROP) NOT NULL                                #alterColumnNullability
     | ALTER TABLE table=multipartIdentifier partitionSpec?
         CHANGE COLUMN?
         colName=multipartIdentifier colType colPosition?               #hiveChangeColumn
@@ -869,7 +872,7 @@ qualifiedColTypeWithPositionList
     ;
 
 qualifiedColTypeWithPosition
-    : name=multipartIdentifier dataType commentSpec? colPosition?
+    : name=multipartIdentifier dataType (NOT NULL)? commentSpec? colPosition?
     ;
 
 colTypeList
@@ -877,7 +880,7 @@ colTypeList
     ;
 
 colType
-    : colName=errorCapturingIdentifier dataType commentSpec?
+    : colName=errorCapturingIdentifier dataType (NOT NULL)? commentSpec?
     ;
 
 complexColTypeList
@@ -885,7 +888,7 @@ complexColTypeList
     ;
 
 complexColType
-    : identifier ':' dataType commentSpec?
+    : identifier ':' dataType (NOT NULL)? commentSpec?
     ;
 
 whenClause
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 7834399..58a592c 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -168,25 +168,22 @@ public interface TableChange {
    * @return a TableChange for the update
    */
   static TableChange updateColumnType(String[] fieldNames, DataType newDataType) {
-    return new UpdateColumnType(fieldNames, newDataType, true);
+    return new UpdateColumnType(fieldNames, newDataType);
   }
 
   /**
-   * Create a TableChange for updating the type of a field.
+   * Create a TableChange for updating the nullability of a field.
    * <p>
-   * The field names are used to find the field to update.
+   * The name is used to find the field to update.
    * <p>
    * If the field does not exist, the change will result in an {@link IllegalArgumentException}.
    *
    * @param fieldNames field names of the column to update
-   * @param newDataType the new data type
+   * @param nullable the nullability
    * @return a TableChange for the update
    */
-  static TableChange updateColumnType(
-      String[] fieldNames,
-      DataType newDataType,
-      boolean isNullable) {
-    return new UpdateColumnType(fieldNames, newDataType, isNullable);
+  static TableChange updateColumnNullability(String[] fieldNames, boolean nullable) {
+    return new UpdateColumnNullability(fieldNames, nullable);
   }
 
   /**
@@ -488,12 +485,10 @@ public interface TableChange {
   final class UpdateColumnType implements ColumnChange {
     private final String[] fieldNames;
     private final DataType newDataType;
-    private final boolean isNullable;
 
-    private UpdateColumnType(String[] fieldNames, DataType newDataType, boolean isNullable) {
+    private UpdateColumnType(String[] fieldNames, DataType newDataType) {
       this.fieldNames = fieldNames;
       this.newDataType = newDataType;
-      this.isNullable = isNullable;
     }
 
     @Override
@@ -505,23 +500,59 @@ public interface TableChange {
       return newDataType;
     }
 
-    public boolean isNullable() {
-      return isNullable;
-    }
-
     @Override
     public boolean equals(Object o) {
       if (this == o) return true;
       if (o == null || getClass() != o.getClass()) return false;
       UpdateColumnType that = (UpdateColumnType) o;
-      return isNullable == that.isNullable &&
-        Arrays.equals(fieldNames, that.fieldNames) &&
+      return Arrays.equals(fieldNames, that.fieldNames) &&
         newDataType.equals(that.newDataType);
     }
 
     @Override
     public int hashCode() {
-      int result = Objects.hash(newDataType, isNullable);
+      int result = Objects.hash(newDataType);
+      result = 31 * result + Arrays.hashCode(fieldNames);
+      return result;
+    }
+  }
+
+  /**
+   * A TableChange to update the nullability of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change must result in an {@link IllegalArgumentException}.
+   */
+  final class UpdateColumnNullability implements ColumnChange {
+    private final String[] fieldNames;
+    private final boolean nullable;
+
+    private UpdateColumnNullability(String[] fieldNames, boolean nullable) {
+      this.fieldNames = fieldNames;
+      this.nullable = nullable;
+    }
+
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public boolean nullable() {
+      return nullable;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      UpdateColumnNullability that = (UpdateColumnNullability) o;
+      return nullable == that.nullable &&
+        Arrays.equals(fieldNames, that.fieldNames);
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(nullable);
       result = 31 * result + Arrays.hashCode(fieldNames);
       return result;
     }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 8eba664..bd50a6b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, DeleteColumn, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnType}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 
@@ -466,6 +466,13 @@ trait CheckAnalysis extends PredicateHelper {
                         s"${field.dataType.simpleString} cannot be cast to " +
                         s"${update.newDataType.simpleString}")
                 }
+              case update: UpdateColumnNullability =>
+                val field = findField("update", update.fieldNames)
+                val fieldName = update.fieldNames.quoted
+                if (!update.nullable && field.nullable) {
+                  throw new AnalysisException(
+                    s"Cannot change nullable column to non-nullable: $fieldName")
+                }
               case rename: RenameColumn =>
                 findField("rename", rename.fieldNames)
               case update: UpdateColumnComment =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index 5a7e0a0..034e30d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -38,29 +38,32 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
         TableChange.addColumn(
           col.name.toArray,
           col.dataType,
-          true,
+          col.nullable,
           col.comment.orNull,
           col.position.orNull)
       }
       createAlterTable(nameParts, catalog, tbl, changes)
 
-    case AlterTableAlterColumnStatement(
-         nameParts @ NonSessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
-      val colNameArray = colName.toArray
-      val typeChange = dataType.map { newDataType =>
-        TableChange.updateColumnType(colNameArray, newDataType, true)
+    case a @ AlterTableAlterColumnStatement(
+         nameParts @ NonSessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
+      val colName = a.column.toArray
+      val typeChange = a.dataType.map { newDataType =>
+        TableChange.updateColumnType(colName, newDataType)
       }
-      val commentChange = comment.map { newComment =>
-        TableChange.updateColumnComment(colNameArray, newComment)
+      val nullabilityChange = a.nullable.map { nullable =>
+        TableChange.updateColumnNullability(colName, nullable)
       }
-      val positionChange = pos.map { newPosition =>
-        TableChange.updateColumnPosition(colNameArray, newPosition)
+      val commentChange = a.comment.map { newComment =>
+        TableChange.updateColumnComment(colName, newComment)
+      }
+      val positionChange = a.position.map { newPosition =>
+        TableChange.updateColumnPosition(colName, newPosition)
       }
       createAlterTable(
         nameParts,
         catalog,
         tbl,
-        typeChange.toSeq ++ commentChange ++ positionChange)
+        typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
 
     case AlterTableRenameColumnStatement(
          nameParts @ NonSessionCatalogAndTable(catalog, tbl), col, newName) =>
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7f66dbf..27647a5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -2239,10 +2239,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
     }
 
     StructField(
-      colName.getText,
-      cleanedDataType,
-      nullable = true,
-      builder.build())
+      name = colName.getText,
+      dataType = cleanedDataType,
+      nullable = NULL == null,
+      metadata = builder.build())
   }
 
   /**
@@ -2265,7 +2265,10 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
    */
   override def visitComplexColType(ctx: ComplexColTypeContext): StructField = withOrigin(ctx) {
     import ctx._
-    val structField = StructField(identifier.getText, typedVisit(dataType), nullable = true)
+    val structField = StructField(
+      name = identifier.getText,
+      dataType = typedVisit(dataType()),
+      nullable = NULL == null)
     Option(commentSpec).map(visitCommentSpec).map(structField.withComment).getOrElse(structField)
   }
 
@@ -2858,10 +2861,11 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
   override def visitQualifiedColTypeWithPosition(
       ctx: QualifiedColTypeWithPositionContext): QualifiedColType = withOrigin(ctx) {
     QualifiedColType(
-      typedVisit[Seq[String]](ctx.name),
-      typedVisit[DataType](ctx.dataType),
-      Option(ctx.commentSpec()).map(visitCommentSpec),
-      Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
+      name = typedVisit[Seq[String]](ctx.name),
+      dataType = typedVisit[DataType](ctx.dataType),
+      nullable = ctx.NULL == null,
+      comment = Option(ctx.commentSpec()).map(visitCommentSpec),
+      position = Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
   }
 
   /**
@@ -2917,9 +2921,35 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
     AlterTableAlterColumnStatement(
       visitMultipartIdentifier(ctx.table),
       typedVisit[Seq[String]](ctx.column),
-      Option(ctx.dataType).map(typedVisit[DataType]),
-      Option(ctx.commentSpec()).map(visitCommentSpec),
-      Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
+      dataType = Option(ctx.dataType).map(typedVisit[DataType]),
+      nullable = None,
+      comment = Option(ctx.commentSpec()).map(visitCommentSpec),
+      position = Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
+  }
+
+  /**
+   * Parse a [[AlterTableAlterColumnStatement]] command to change column nullability.
+   *
+   * For example:
+   * {{{
+   *   ALTER TABLE table1 ALTER COLUMN a.b.c SET NOT NULL
+   *   ALTER TABLE table1 ALTER COLUMN a.b.c DROP NOT NULL
+   * }}}
+   */
+  override def visitAlterColumnNullability(ctx: AlterColumnNullabilityContext): LogicalPlan = {
+    withOrigin(ctx) {
+      val nullable = ctx.setOrDrop.getType match {
+        case SqlBaseParser.SET => false
+        case SqlBaseParser.DROP => true
+      }
+      AlterTableAlterColumnStatement(
+        visitMultipartIdentifier(ctx.table),
+        typedVisit[Seq[String]](ctx.column),
+        dataType = None,
+        nullable = Some(nullable),
+        comment = None,
+        position = None)
+    }
   }
 
   /**
@@ -2941,13 +2971,18 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
       throw new AnalysisException("Renaming column is not supported in Hive-style ALTER COLUMN, " +
         "please run RENAME COLUMN instead.")
     }
+    if (ctx.colType.NULL != null) {
+      throw new AnalysisException("NOT NULL is not supported in Hive-style ALTER COLUMN, " +
+        "please run ALTER COLUMN ... SET/DROP NOT NULL instead.")
+    }
 
     AlterTableAlterColumnStatement(
       typedVisit[Seq[String]](ctx.table),
       columnNameParts,
-      Option(ctx.colType().dataType()).map(typedVisit[DataType]),
-      Option(ctx.colType().commentSpec()).map(visitCommentSpec),
-      Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
+      dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]),
+      nullable = None,
+      comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
+      position = Option(ctx.colPosition).map(typedVisit[ColumnPosition]))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index de3d858..476ecf5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -145,6 +145,7 @@ case class ReplaceTableAsSelectStatement(
 case class QualifiedColType(
     name: Seq[String],
     dataType: DataType,
+    nullable: Boolean,
     comment: Option[String],
     position: Option[ColumnPosition])
 
@@ -162,6 +163,7 @@ case class AlterTableAlterColumnStatement(
     tableName: Seq[String],
     column: Seq[String],
     dataType: Option[DataType],
+    nullable: Option[Boolean],
     comment: Option[String],
     position: Option[ColumnPosition]) extends ParsedStatement
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 671beb3..b28e3e6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -127,11 +127,12 @@ private[sql] object CatalogV2Util {
 
         case update: UpdateColumnType =>
           replace(schema, update.fieldNames, field => {
-            if (!update.isNullable && field.nullable) {
-              throw new IllegalArgumentException(
-                s"Cannot change optional column to required: $field.name")
-            }
-            Some(StructField(field.name, update.newDataType, update.isNullable, field.metadata))
+            Some(field.copy(dataType = update.newDataType))
+          })
+
+        case update: UpdateColumnNullability =>
+          replace(schema, update.fieldNames, field => {
+            Some(field.copy(nullable = update.nullable))
           })
 
         case update: UpdateColumnComment =>
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 2e81520..9309daa 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -70,13 +70,13 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("create/replace table using - schema") {
-    val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
-    val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING) USING parquet"
+    val createSql = "CREATE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
+    val replaceSql = "REPLACE TABLE my_tab(a INT COMMENT 'test', b STRING NOT NULL) USING parquet"
     val expectedTableSpec = TableSpec(
       Seq("my_tab"),
       Some(new StructType()
         .add("a", IntegerType, nullable = true, "test")
-        .add("b", StringType)),
+        .add("b", StringType, nullable = false)),
       Seq.empty[Transform],
       None,
       Map.empty[String, String],
@@ -492,7 +492,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, None)
+        QualifiedColType(Seq("x"), IntegerType, true, None, None)
       )))
   }
 
@@ -500,8 +500,8 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int, y string"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, None),
-        QualifiedColType(Seq("y"), StringType, None, None)
+        QualifiedColType(Seq("x"), IntegerType, true, None, None),
+        QualifiedColType(Seq("y"), StringType, true, None, None)
       )))
   }
 
@@ -509,7 +509,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS x int"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, None)
+        QualifiedColType(Seq("x"), IntegerType, true, None, None)
       )))
   }
 
@@ -517,7 +517,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int)"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, None)
+        QualifiedColType(Seq("x"), IntegerType, true, None, None)
       )))
   }
 
@@ -525,7 +525,15 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMNS (x int COMMENT 'doc')"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
+        QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None)
+      )))
+  }
+
+  test("alter table: add non-nullable column") {
+    comparePlans(
+      parsePlan("ALTER TABLE table_name ADD COLUMN x int NOT NULL"),
+      AlterTableAddColumnsStatement(Seq("table_name"), Seq(
+        QualifiedColType(Seq("x"), IntegerType, false, None, None)
       )))
   }
 
@@ -533,7 +541,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int COMMENT 'doc'"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, Some("doc"), None)
+        QualifiedColType(Seq("x"), IntegerType, true, Some("doc"), None)
       )))
   }
 
@@ -541,13 +549,13 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int FIRST"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, Some(first()))
+        QualifiedColType(Seq("x"), IntegerType, true, None, Some(first()))
       )))
 
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x int AFTER y"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x"), IntegerType, None, Some(after("y")))
+        QualifiedColType(Seq("x"), IntegerType, true, None, Some(after("y")))
       )))
   }
 
@@ -555,7 +563,7 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc'"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None)
+        QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None)
       )))
   }
 
@@ -563,8 +571,8 @@ class DDLParserSuite extends AnalysisTest {
     comparePlans(
       parsePlan("ALTER TABLE table_name ADD COLUMN x.y.z int COMMENT 'doc', a.b string FIRST"),
       AlterTableAddColumnsStatement(Seq("table_name"), Seq(
-        QualifiedColType(Seq("x", "y", "z"), IntegerType, Some("doc"), None),
-        QualifiedColType(Seq("a", "b"), StringType, None, Some(first()))
+        QualifiedColType(Seq("x", "y", "z"), IntegerType, true, Some("doc"), None),
+        QualifiedColType(Seq("a", "b"), StringType, true, None, Some(first()))
       )))
   }
 
@@ -598,6 +606,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("a", "b", "c"),
         Some(LongType),
         None,
+        None,
         None))
   }
 
@@ -609,6 +618,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("a", "b", "c"),
         Some(LongType),
         None,
+        None,
         None))
   }
 
@@ -619,6 +629,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("table_name"),
         Seq("a", "b", "c"),
         None,
+        None,
         Some("new comment"),
         None))
   }
@@ -631,6 +642,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("a", "b", "c"),
         None,
         None,
+        None,
         Some(first())))
   }
 
@@ -642,10 +654,33 @@ class DDLParserSuite extends AnalysisTest {
         Seq("table_name"),
         Seq("a", "b", "c"),
         Some(LongType),
+        None,
         Some("new comment"),
         Some(after("d"))))
   }
 
+  test("alter table: SET/DROP NOT NULL") {
+    comparePlans(
+      parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"),
+      AlterTableAlterColumnStatement(
+        Seq("table_name"),
+        Seq("a", "b", "c"),
+        None,
+        Some(false),
+        None,
+        None))
+
+    comparePlans(
+      parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"),
+      AlterTableAlterColumnStatement(
+        Seq("table_name"),
+        Seq("a", "b", "c"),
+        None,
+        Some(true),
+        None,
+        None))
+  }
+
   test("alter table: drop column") {
     comparePlans(
       parsePlan("ALTER TABLE table_name DROP COLUMN a.b.c"),
@@ -675,6 +710,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("a", "b", "c"),
         Some(IntegerType),
         None,
+        None,
         None))
 
     comparePlans(
@@ -683,6 +719,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("table_name"),
         Seq("a", "b", "c"),
         Some(IntegerType),
+        None,
         Some("new_comment"),
         None))
 
@@ -693,6 +730,7 @@ class DDLParserSuite extends AnalysisTest {
         Seq("a", "b", "c"),
         Some(IntegerType),
         None,
+        None,
         Some(after("other_col"))))
 
     // renaming column not supported in hive style ALTER COLUMN.
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
index ec33a16..a4c85ec 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TableCatalogSuite.scala
@@ -368,7 +368,7 @@ class TableCatalogSuite extends SparkFunSuite {
     assert(updated.schema == expectedSchema)
   }
 
-  test("alterTable: update column data type and nullability") {
+  test("alterTable: update column nullability") {
     val catalog = newCatalog()
 
     val originalSchema = new StructType()
@@ -379,27 +379,12 @@ class TableCatalogSuite extends SparkFunSuite {
     assert(table.schema == originalSchema)
 
     val updated = catalog.alterTable(testIdent,
-      TableChange.updateColumnType(Array("id"), LongType, true))
+      TableChange.updateColumnNullability(Array("id"), true))
 
-    val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+    val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType)
     assert(updated.schema == expectedSchema)
   }
 
-  test("alterTable: update optional column to required fails") {
-    val catalog = newCatalog()
-
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
-
-    assert(table.schema == schema)
-
-    val exc = intercept[IllegalArgumentException] {
-      catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType, false))
-    }
-
-    assert(exc.getMessage.contains("Cannot change optional column to required"))
-    assert(exc.getMessage.contains("id"))
-  }
-
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 9f0567d..c432528 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -51,66 +51,79 @@ class ResolveSessionCatalog(
          nameParts @ SessionCatalogAndTable(catalog, tbl), cols) =>
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
-          cols.foreach(c => assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand"))
+          cols.foreach { c =>
+            assertTopLevelColumn(c.name, "AlterTableAddColumnsCommand")
+            if (!c.nullable) {
+              throw new AnalysisException(
+                "ADD COLUMN with v1 tables cannot specify NOT NULL.")
+            }
+          }
           AlterTableAddColumnsCommand(tbl.asTableIdentifier, cols.map(convertToStructField))
       }.getOrElse {
         val changes = cols.map { col =>
           TableChange.addColumn(
             col.name.toArray,
             col.dataType,
-            true,
+            col.nullable,
             col.comment.orNull,
             col.position.orNull)
         }
         createAlterTable(nameParts, catalog, tbl, changes)
       }
 
-    case AlterTableAlterColumnStatement(
-         nameParts @ SessionCatalogAndTable(catalog, tbl), colName, dataType, comment, pos) =>
+    case a @ AlterTableAlterColumnStatement(
+         nameParts @ SessionCatalogAndTable(catalog, tbl), _, _, _, _, _) =>
       loadTable(catalog, tbl.asIdentifier).collect {
         case v1Table: V1Table =>
-          if (colName.length > 1) {
+          if (a.column.length > 1) {
             throw new AnalysisException(
               "ALTER COLUMN with qualified column is only supported with v2 tables.")
           }
-          if (dataType.isEmpty) {
+          if (a.dataType.isEmpty) {
             throw new AnalysisException(
               "ALTER COLUMN with v1 tables must specify new data type.")
           }
-          if (pos.isDefined) {
+          if (a.nullable.isDefined) {
+            throw new AnalysisException(
+              "ALTER COLUMN with v1 tables cannot specify NOT NULL.")
+          }
+          if (a.position.isDefined) {
             throw new AnalysisException("" +
               "ALTER COLUMN ... FIRST | ALTER is only supported with v2 tables.")
           }
           val builder = new MetadataBuilder
           // Add comment to metadata
-          comment.map(c => builder.putString("comment", c))
+          a.comment.map(c => builder.putString("comment", c))
           // Add Hive type string to metadata.
-          val cleanedDataType = HiveStringType.replaceCharType(dataType.get)
-          if (dataType.get != cleanedDataType) {
-            builder.putString(HIVE_TYPE_STRING, dataType.get.catalogString)
+          val cleanedDataType = HiveStringType.replaceCharType(a.dataType.get)
+          if (a.dataType.get != cleanedDataType) {
+            builder.putString(HIVE_TYPE_STRING, a.dataType.get.catalogString)
           }
           val newColumn = StructField(
-            colName(0),
+            a.column(0),
             cleanedDataType,
             nullable = true,
             builder.build())
-          AlterTableChangeColumnCommand(tbl.asTableIdentifier, colName(0), newColumn)
+          AlterTableChangeColumnCommand(tbl.asTableIdentifier, a.column(0), newColumn)
       }.getOrElse {
-        val colNameArray = colName.toArray
-        val typeChange = dataType.map { newDataType =>
-          TableChange.updateColumnType(colNameArray, newDataType, true)
+        val colName = a.column.toArray
+        val typeChange = a.dataType.map { newDataType =>
+          TableChange.updateColumnType(colName, newDataType)
+        }
+        val nullabilityChange = a.nullable.map { nullable =>
+          TableChange.updateColumnNullability(colName, nullable)
         }
-        val commentChange = comment.map { newComment =>
-          TableChange.updateColumnComment(colNameArray, newComment)
+        val commentChange = a.comment.map { newComment =>
+          TableChange.updateColumnComment(colName, newComment)
         }
-        val positionChange = pos.map { newPosition =>
-          TableChange.updateColumnPosition(colNameArray, newPosition)
+        val positionChange = a.position.map { newPosition =>
+          TableChange.updateColumnPosition(colName, newPosition)
         }
         createAlterTable(
           nameParts,
           catalog,
           tbl,
-          typeChange.toSeq ++ commentChange ++ positionChange)
+          typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange)
       }
 
     case AlterTableRenameColumnStatement(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 2ba3c99..d304d5b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -86,6 +86,21 @@ trait AlterTableTests extends SharedSparkSession {
     }
   }
 
+  test("AlterTable: add column with NOT NULL") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id int) USING $v2Format")
+      sql(s"ALTER TABLE $t ADD COLUMN data string NOT NULL")
+
+      val table = getTableMetadata(t)
+
+      assert(table.name === fullTableName(t))
+      assert(table.schema === StructType(Seq(
+        StructField("id", IntegerType),
+        StructField("data", StringType, nullable = false))))
+    }
+  }
+
   test("AlterTable: add column with comment") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
@@ -290,12 +305,33 @@ trait AlterTableTests extends SharedSparkSession {
       sql(s"ALTER TABLE $t ALTER COLUMN id TYPE bigint")
 
       val table = getTableMetadata(t)
-
       assert(table.name === fullTableName(t))
       assert(table.schema === new StructType().add("id", LongType))
     }
   }
 
+  test("AlterTable: SET/DROP NOT NULL") {
+    val t = s"${catalogAndNamespace}table_name"
+    withTable(t) {
+      sql(s"CREATE TABLE $t (id bigint NOT NULL) USING $v2Format")
+      sql(s"ALTER TABLE $t ALTER COLUMN id SET NOT NULL")
+
+      val table = getTableMetadata(t)
+      assert(table.name === fullTableName(t))
+      assert(table.schema === new StructType().add("id", LongType, nullable = false))
+
+      sql(s"ALTER TABLE $t ALTER COLUMN id DROP NOT NULL")
+      val table2 = getTableMetadata(t)
+      assert(table2.name === fullTableName(t))
+      assert(table2.schema === new StructType().add("id", LongType))
+
+      val e = intercept[AnalysisException] {
+        sql(s"ALTER TABLE $t ALTER COLUMN id SET NOT NULL")
+      }
+      assert(e.message.contains("Cannot change nullable column to non-nullable"))
+    }
+  }
+
   test("AlterTable: update nested type float -> double") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
@@ -303,7 +339,6 @@ trait AlterTableTests extends SharedSparkSession {
       sql(s"ALTER TABLE $t ALTER COLUMN point.x TYPE double")
 
       val table = getTableMetadata(t)
-
       assert(table.name === fullTableName(t))
       assert(table.schema === new StructType()
         .add("id", IntegerType)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index ccf853f..e6da822 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -28,7 +28,7 @@ import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAM
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
 import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.sources.SimpleScanSource
-import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructType}
+import org.apache.spark.sql.types.{BooleanType, LongType, StringType, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 class DataSourceV2SQLSuite
@@ -86,7 +86,7 @@ class DataSourceV2SQLSuite
   }
 
   test("CreateTable: use v2 plan because catalog is set") {
-    spark.sql("CREATE TABLE testcat.table_name (id bigint, data string) USING foo")
+    spark.sql("CREATE TABLE testcat.table_name (id bigint NOT NULL, data string) USING foo")
 
     val testCatalog = catalog("testcat").asTableCatalog
     val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
@@ -94,7 +94,9 @@ class DataSourceV2SQLSuite
     assert(table.name == "testcat.table_name")
     assert(table.partitioning.isEmpty)
     assert(table.properties == Map("provider" -> "foo").asJava)
-    assert(table.schema == new StructType().add("id", LongType).add("data", StringType))
+    assert(table.schema == new StructType()
+      .add("id", LongType, nullable = false)
+      .add("data", StringType))
 
     val rdd = spark.sparkContext.parallelize(table.asInstanceOf[InMemoryTable].rows)
     checkAnswer(spark.internalCreateDataFrame(rdd, table.schema), Seq.empty)
@@ -361,14 +363,14 @@ class DataSourceV2SQLSuite
     val table = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
     assert(table.asInstanceOf[InMemoryTable].rows.nonEmpty)
 
-    spark.sql("REPLACE TABLE testcat.table_name (id bigint) USING foo")
+    spark.sql("REPLACE TABLE testcat.table_name (id bigint NOT NULL) USING foo")
     val replaced = testCatalog.loadTable(Identifier.of(Array(), "table_name"))
 
     assert(replaced.asInstanceOf[InMemoryTable].rows.isEmpty,
         "Replaced table should have no rows after committing.")
     assert(replaced.schema().fields.length === 1,
         "Replaced table should have new schema.")
-    assert(replaced.schema().fields(0).name === "id",
+    assert(replaced.schema().fields(0) === StructField("id", LongType, nullable = false),
       "Replaced table should have new schema.")
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index a029988..94b2bde 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -391,7 +391,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     assert(updated.schema == expectedSchema)
   }
 
-  test("alterTable: update column data type and nullability") {
+  test("alterTable: update column nullability") {
     val catalog = newCatalog()
 
     val originalSchema = new StructType()
@@ -402,27 +402,12 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite {
     assert(table.schema == originalSchema)
 
     val updated = catalog.alterTable(testIdent,
-      TableChange.updateColumnType(Array("id"), LongType, true))
+      TableChange.updateColumnNullability(Array("id"), true))
 
-    val expectedSchema = new StructType().add("id", LongType).add("data", StringType)
+    val expectedSchema = new StructType().add("id", IntegerType).add("data", StringType)
     assert(updated.schema == expectedSchema)
   }
 
-  test("alterTable: update optional column to required fails") {
-    val catalog = newCatalog()
-
-    val table = catalog.createTable(testIdent, schema, Array.empty, emptyProps)
-
-    assert(table.schema == schema)
-
-    val exc = intercept[IllegalArgumentException] {
-      catalog.alterTable(testIdent, TableChange.updateColumnType(Array("id"), LongType, false))
-    }
-
-    assert(exc.getMessage.contains("Cannot change optional column to required"))
-    assert(exc.getMessage.contains("id"))
-  }
-
   test("alterTable: update missing column fails") {
     val catalog = newCatalog()
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org