You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/21 17:09:04 UTC

[spark] branch master updated: [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS to V2 data sources

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new db0e972c09c [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS  to V2 data sources
db0e972c09c is described below

commit db0e972c09c7ef22f46a6302bf969d85921cf0cb
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Tue Jun 21 10:08:39 2022 -0700

    [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS  to V2 data sources
    
    ### What changes were proposed in this pull request?
    
    Extend DEFAULT column support in ALTER TABLE ADD COLUMNS commands to include V2 data sources.
    
    Example:
    
    ```
    > create or replace table t (a string default 'abc') using $v2Source
    > insert into t values (default)
    > alter table t add column (b string default 'def')
    > insert into t values ("ghi")
    > Select * from t
    "abc", "def",
    "ghi", "def"
    ```
    
    ### Why are the changes needed?
    
    This makes V2 data sources easier to use and extend.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36771 from dtenedor/default-cols-v2-tables.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 .../spark/sql/connector/catalog/TableChange.java   | 23 +++++++++-----
 .../plans/logical/v2AlterTableCommands.scala       |  6 ++--
 .../sql/connector/catalog/CatalogV2Util.scala      | 35 +++++++++++++++-------
 .../org/apache/spark/sql/types/StructField.scala   | 11 +++++++
 .../connector/catalog/InMemoryTableCatalog.scala   |  2 +-
 .../datasources/v2/V2SessionCatalog.scala          |  3 +-
 .../spark/sql/connector/AlterTableTests.scala      | 21 +++++++++++++
 .../DataSourceV2DataFrameSessionCatalogSuite.scala |  2 +-
 8 files changed, 81 insertions(+), 22 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 72dbf6ca07a..b0b686942c6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -79,7 +79,7 @@ public interface TableChange {
    * @return a TableChange for the addition
    */
   static TableChange addColumn(String[] fieldNames, DataType dataType) {
-    return new AddColumn(fieldNames, dataType, true, null, null);
+    return new AddColumn(fieldNames, dataType, true, null, null, null);
   }
 
   /**
@@ -95,7 +95,7 @@ public interface TableChange {
    * @return a TableChange for the addition
    */
   static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
-    return new AddColumn(fieldNames, dataType, isNullable, null, null);
+    return new AddColumn(fieldNames, dataType, isNullable, null, null, null);
   }
 
   /**
@@ -116,7 +116,7 @@ public interface TableChange {
       DataType dataType,
       boolean isNullable,
       String comment) {
-    return new AddColumn(fieldNames, dataType, isNullable, comment, null);
+    return new AddColumn(fieldNames, dataType, isNullable, comment, null, null);
   }
 
   /**
@@ -131,6 +131,7 @@ public interface TableChange {
    * @param isNullable whether the new column can contain null
    * @param comment the new field's comment string
    * @param position the new columns's position
+   * @param defaultValue default value to return when scanning from the new column, if any
    * @return a TableChange for the addition
    */
   static TableChange addColumn(
@@ -138,8 +139,9 @@ public interface TableChange {
       DataType dataType,
       boolean isNullable,
       String comment,
-      ColumnPosition position) {
-    return new AddColumn(fieldNames, dataType, isNullable, comment, position);
+      ColumnPosition position,
+      String defaultValue) {
+    return new AddColumn(fieldNames, dataType, isNullable, comment, position, defaultValue);
   }
 
   /**
@@ -378,18 +380,21 @@ public interface TableChange {
     private final boolean isNullable;
     private final String comment;
     private final ColumnPosition position;
+    private final String defaultValue;
 
     private AddColumn(
         String[] fieldNames,
         DataType dataType,
         boolean isNullable,
         String comment,
-        ColumnPosition position) {
+        ColumnPosition position,
+        String defaultValue) {
       this.fieldNames = fieldNames;
       this.dataType = dataType;
       this.isNullable = isNullable;
       this.comment = comment;
       this.position = position;
+      this.defaultValue = defaultValue;
     }
 
     @Override
@@ -415,6 +420,9 @@ public interface TableChange {
       return position;
     }
 
+    @Nullable
+    public String defaultValue() { return defaultValue; }
+
     @Override
     public boolean equals(Object o) {
       if (this == o) return true;
@@ -424,7 +432,8 @@ public interface TableChange {
         Arrays.equals(fieldNames, addColumn.fieldNames) &&
         dataType.equals(addColumn.dataType) &&
         Objects.equals(comment, addColumn.comment) &&
-        Objects.equals(position, addColumn.position);
+        Objects.equals(position, addColumn.position) &&
+        Objects.equals(defaultValue, addColumn.defaultValue);
     }
 
     @Override
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index fa497658896..e72ce8d421b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -119,7 +119,8 @@ case class AddColumns(
         col.dataType,
         col.nullable,
         col.comment.orNull,
-        col.position.map(_.position).orNull)
+        col.position.map(_.position).orNull,
+        col.default.orNull)
     }
   }
 
@@ -154,7 +155,8 @@ case class ReplaceColumns(
         col.dataType,
         col.nullable,
         col.comment.orNull,
-        null)
+        null,
+        col.default.orNull)
     }
     deleteChanges ++ addChanges
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 4c174ad7c4f..d47ffcc2a71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
 import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.TableChange._
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -130,23 +131,34 @@ private[sql] object CatalogV2Util {
   /**
    * Apply schema changes to a schema and return the result.
    */
-  def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
+  def applySchemaChanges(
+      schema: StructType,
+      changes: Seq[TableChange],
+      tableProvider: Option[String],
+      statementType: String): StructType = {
     changes.foldLeft(schema) { (schema, change) =>
       change match {
         case add: AddColumn =>
           add.fieldNames match {
             case Array(name) =>
               val field = StructField(name, add.dataType, nullable = add.isNullable)
-              val newField = Option(add.comment).map(field.withComment).getOrElse(field)
-              addField(schema, newField, add.position())
-
+              val fieldWithDefault: StructField =
+                Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+              val fieldWithComment: StructField =
+                Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
+              addField(schema, fieldWithComment, add.position(), tableProvider, statementType)
             case names =>
               replace(schema, names.init, parent => parent.dataType match {
                 case parentType: StructType =>
                   val field = StructField(names.last, add.dataType, nullable = add.isNullable)
-                  val newField = Option(add.comment).map(field.withComment).getOrElse(field)
-                  Some(parent.copy(dataType = addField(parentType, newField, add.position())))
-
+                  val fieldWithDefault: StructField =
+                    Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+                  val fieldWithComment: StructField =
+                    Option(add.comment).map(fieldWithDefault.withComment)
+                      .getOrElse(fieldWithDefault)
+                  Some(parent.copy(dataType =
+                    addField(parentType, fieldWithComment, add.position(), tableProvider,
+                      statementType)))
                 case _ =>
                   throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
               })
@@ -176,7 +188,7 @@ private[sql] object CatalogV2Util {
               throw new IllegalArgumentException("Field not found: " + name)
             }
             val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
-            addField(withFieldRemoved, oldField, update.position())
+            addField(withFieldRemoved, oldField, update.position(), tableProvider, statementType)
           }
 
           update.fieldNames() match {
@@ -204,8 +216,10 @@ private[sql] object CatalogV2Util {
   private def addField(
       schema: StructType,
       field: StructField,
-      position: ColumnPosition): StructType = {
-    if (position == null) {
+      position: ColumnPosition,
+      tableProvider: Option[String],
+      statementType: String): StructType = {
+    val newSchema: StructType = if (position == null) {
       schema.add(field)
     } else if (position.isInstanceOf[First]) {
       StructType(field +: schema.fields)
@@ -218,6 +232,7 @@ private[sql] object CatalogV2Util {
       val (before, after) = schema.fields.splitAt(fieldIndex + 1)
       StructType(before ++ (field +: after))
     }
+    constantFoldCurrentDefaultsToExistDefaults(newSchema, tableProvider, statementType)
   }
 
   private def replace(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index 1fdde3e5219..1869ac641b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -118,6 +118,17 @@ case class StructField(
     }
   }
 
+  /**
+   * Updates the StructField with a new existence default value.
+   */
+  def withExistenceDefaultValue(value: String): StructField = {
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, value)
+      .build()
+    copy(metadata = newMetadata)
+  }
+
   /**
    * Return the existence default value of this StructField.
    */
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 80a1a61d78a..3736ba4d785 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -119,7 +119,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
   override def alterTable(ident: Identifier, changes: TableChange*): Table = {
     val table = loadTable(ident).asInstanceOf[InMemoryTable]
     val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
-    val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
+    val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
 
     // fail if the last column in the schema was dropped
     if (schema.fields.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b9a4e0e6ba3..0c144266411 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -149,7 +149,8 @@ class V2SessionCatalog(catalog: SessionCatalog)
     }
 
     val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
-    val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
+    val schema = CatalogV2Util.applySchemaChanges(
+      catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
     val comment = properties.get(TableCatalog.PROP_COMMENT)
     val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
     val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 19f3f86c941..df6bfdc319f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
 import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -310,6 +311,26 @@ trait AlterTableTests extends SharedSparkSession {
     }
   }
 
+  test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD COLUMN") {
+    withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") {
+      val t = s"${catalogAndNamespace}table_name"
+      withTable("t") {
+        sql(s"create table $t (a string) using $v2Format")
+        sql(s"alter table $t add column (b int default 2 + 3)")
+
+        val tableName = fullTableName(t)
+        val table = getTableMetadata(tableName)
+
+        assert(table.name === tableName)
+        assert(table.schema === new StructType()
+          .add("a", StringType)
+          .add(StructField("b", IntegerType)
+            .withCurrentDefaultValue("2 + 3")
+            .withExistenceDefaultValue("5")))
+      }
+    }
+  }
+
   test("AlterTable: add complex column") {
     val t = s"${catalogAndNamespace}table_name"
     withTable(t) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index d0cb8141ad7..835566238c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -110,7 +110,7 @@ class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable
     Option(tables.get(ident)) match {
       case Some(table) =>
         val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
-        val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
+        val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
 
         // fail if the last column in the schema was dropped
         if (schema.fields.isEmpty) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org