You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/06/21 17:09:04 UTC
[spark] branch master updated: [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS to V2 data sources
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new db0e972c09c [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS to V2 data sources
db0e972c09c is described below
commit db0e972c09c7ef22f46a6302bf969d85921cf0cb
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Tue Jun 21 10:08:39 2022 -0700
[SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ADD COLUMNS to V2 data sources
### What changes were proposed in this pull request?
Extend DEFAULT column support in ALTER TABLE ADD COLUMNS commands to include V2 data sources.
Example:
```
> create or replace table t (a string default 'abc') using $v2Source
> insert into t values (default)
> alter table t add column (b string default 'def')
> insert into t values ("ghi")
> Select * from t
"abc", "def",
"ghi", "def"
```
### Why are the changes needed?
This makes V2 data sources easier to use and extend.
### Does this PR introduce _any_ user-facing change?
Yes.
### How was this patch tested?
This PR includes new test coverage.
Closes #36771 from dtenedor/default-cols-v2-tables.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
.../spark/sql/connector/catalog/TableChange.java | 23 +++++++++-----
.../plans/logical/v2AlterTableCommands.scala | 6 ++--
.../sql/connector/catalog/CatalogV2Util.scala | 35 +++++++++++++++-------
.../org/apache/spark/sql/types/StructField.scala | 11 +++++++
.../connector/catalog/InMemoryTableCatalog.scala | 2 +-
.../datasources/v2/V2SessionCatalog.scala | 3 +-
.../spark/sql/connector/AlterTableTests.scala | 21 +++++++++++++
.../DataSourceV2DataFrameSessionCatalogSuite.scala | 2 +-
8 files changed, 81 insertions(+), 22 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index 72dbf6ca07a..b0b686942c6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -79,7 +79,7 @@ public interface TableChange {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType) {
- return new AddColumn(fieldNames, dataType, true, null, null);
+ return new AddColumn(fieldNames, dataType, true, null, null, null);
}
/**
@@ -95,7 +95,7 @@ public interface TableChange {
* @return a TableChange for the addition
*/
static TableChange addColumn(String[] fieldNames, DataType dataType, boolean isNullable) {
- return new AddColumn(fieldNames, dataType, isNullable, null, null);
+ return new AddColumn(fieldNames, dataType, isNullable, null, null, null);
}
/**
@@ -116,7 +116,7 @@ public interface TableChange {
DataType dataType,
boolean isNullable,
String comment) {
- return new AddColumn(fieldNames, dataType, isNullable, comment, null);
+ return new AddColumn(fieldNames, dataType, isNullable, comment, null, null);
}
/**
@@ -131,6 +131,7 @@ public interface TableChange {
* @param isNullable whether the new column can contain null
* @param comment the new field's comment string
* @param position the new columns's position
+ * @param defaultValue default value to return when scanning from the new column, if any
* @return a TableChange for the addition
*/
static TableChange addColumn(
@@ -138,8 +139,9 @@ public interface TableChange {
DataType dataType,
boolean isNullable,
String comment,
- ColumnPosition position) {
- return new AddColumn(fieldNames, dataType, isNullable, comment, position);
+ ColumnPosition position,
+ String defaultValue) {
+ return new AddColumn(fieldNames, dataType, isNullable, comment, position, defaultValue);
}
/**
@@ -378,18 +380,21 @@ public interface TableChange {
private final boolean isNullable;
private final String comment;
private final ColumnPosition position;
+ private final String defaultValue;
private AddColumn(
String[] fieldNames,
DataType dataType,
boolean isNullable,
String comment,
- ColumnPosition position) {
+ ColumnPosition position,
+ String defaultValue) {
this.fieldNames = fieldNames;
this.dataType = dataType;
this.isNullable = isNullable;
this.comment = comment;
this.position = position;
+ this.defaultValue = defaultValue;
}
@Override
@@ -415,6 +420,9 @@ public interface TableChange {
return position;
}
+ @Nullable
+ public String defaultValue() { return defaultValue; }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -424,7 +432,8 @@ public interface TableChange {
Arrays.equals(fieldNames, addColumn.fieldNames) &&
dataType.equals(addColumn.dataType) &&
Objects.equals(comment, addColumn.comment) &&
- Objects.equals(position, addColumn.position);
+ Objects.equals(position, addColumn.position) &&
+ Objects.equals(defaultValue, addColumn.defaultValue);
}
@Override
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index fa497658896..e72ce8d421b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -119,7 +119,8 @@ case class AddColumns(
col.dataType,
col.nullable,
col.comment.orNull,
- col.position.map(_.position).orNull)
+ col.position.map(_.position).orNull,
+ col.default.orNull)
}
}
@@ -154,7 +155,8 @@ case class ReplaceColumns(
col.dataType,
col.nullable,
col.comment.orNull,
- null)
+ null,
+ col.default.orNull)
}
deleteChanges ++ addChanges
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index 4c174ad7c4f..d47ffcc2a71 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
@@ -130,23 +131,34 @@ private[sql] object CatalogV2Util {
/**
* Apply schema changes to a schema and return the result.
*/
- def applySchemaChanges(schema: StructType, changes: Seq[TableChange]): StructType = {
+ def applySchemaChanges(
+ schema: StructType,
+ changes: Seq[TableChange],
+ tableProvider: Option[String],
+ statementType: String): StructType = {
changes.foldLeft(schema) { (schema, change) =>
change match {
case add: AddColumn =>
add.fieldNames match {
case Array(name) =>
val field = StructField(name, add.dataType, nullable = add.isNullable)
- val newField = Option(add.comment).map(field.withComment).getOrElse(field)
- addField(schema, newField, add.position())
-
+ val fieldWithDefault: StructField =
+ Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+ val fieldWithComment: StructField =
+ Option(add.comment).map(fieldWithDefault.withComment).getOrElse(fieldWithDefault)
+ addField(schema, fieldWithComment, add.position(), tableProvider, statementType)
case names =>
replace(schema, names.init, parent => parent.dataType match {
case parentType: StructType =>
val field = StructField(names.last, add.dataType, nullable = add.isNullable)
- val newField = Option(add.comment).map(field.withComment).getOrElse(field)
- Some(parent.copy(dataType = addField(parentType, newField, add.position())))
-
+ val fieldWithDefault: StructField =
+ Option(add.defaultValue).map(field.withCurrentDefaultValue).getOrElse(field)
+ val fieldWithComment: StructField =
+ Option(add.comment).map(fieldWithDefault.withComment)
+ .getOrElse(fieldWithDefault)
+ Some(parent.copy(dataType =
+ addField(parentType, fieldWithComment, add.position(), tableProvider,
+ statementType)))
case _ =>
throw new IllegalArgumentException(s"Not a struct: ${names.init.last}")
})
@@ -176,7 +188,7 @@ private[sql] object CatalogV2Util {
throw new IllegalArgumentException("Field not found: " + name)
}
val withFieldRemoved = StructType(struct.fields.filter(_ != oldField))
- addField(withFieldRemoved, oldField, update.position())
+ addField(withFieldRemoved, oldField, update.position(), tableProvider, statementType)
}
update.fieldNames() match {
@@ -204,8 +216,10 @@ private[sql] object CatalogV2Util {
private def addField(
schema: StructType,
field: StructField,
- position: ColumnPosition): StructType = {
- if (position == null) {
+ position: ColumnPosition,
+ tableProvider: Option[String],
+ statementType: String): StructType = {
+ val newSchema: StructType = if (position == null) {
schema.add(field)
} else if (position.isInstanceOf[First]) {
StructType(field +: schema.fields)
@@ -218,6 +232,7 @@ private[sql] object CatalogV2Util {
val (before, after) = schema.fields.splitAt(fieldIndex + 1)
StructType(before ++ (field +: after))
}
+ constantFoldCurrentDefaultsToExistDefaults(newSchema, tableProvider, statementType)
}
private def replace(
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index 1fdde3e5219..1869ac641b0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -118,6 +118,17 @@ case class StructField(
}
}
+ /**
+ * Updates the StructField with a new existence default value.
+ */
+ def withExistenceDefaultValue(value: String): StructField = {
+ val newMetadata = new MetadataBuilder()
+ .withMetadata(metadata)
+ .putString(EXISTS_DEFAULT_COLUMN_METADATA_KEY, value)
+ .build()
+ copy(metadata = newMetadata)
+ }
+
/**
* Return the existence default value of this StructField.
*/
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
index 80a1a61d78a..3736ba4d785 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala
@@ -119,7 +119,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
override def alterTable(ident: Identifier, changes: TableChange*): Table = {
val table = loadTable(ident).asInstanceOf[InMemoryTable]
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
- val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
+ val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b9a4e0e6ba3..0c144266411 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -149,7 +149,8 @@ class V2SessionCatalog(catalog: SessionCatalog)
}
val properties = CatalogV2Util.applyPropertiesChanges(catalogTable.properties, changes)
- val schema = CatalogV2Util.applySchemaChanges(catalogTable.schema, changes)
+ val schema = CatalogV2Util.applySchemaChanges(
+ catalogTable.schema, changes, catalogTable.provider, "ALTER TABLE")
val comment = properties.get(TableCatalog.PROP_COMMENT)
val owner = properties.getOrElse(TableCatalog.PROP_OWNER, catalogTable.owner)
val location = properties.get(TableCatalog.PROP_LOCATION).map(CatalogUtils.stringToURI)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index 19f3f86c941..df6bfdc319f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership
import org.apache.spark.sql.connector.catalog.Table
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
@@ -310,6 +311,26 @@ trait AlterTableTests extends SharedSparkSession {
}
}
+ test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD COLUMN") {
+ withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") {
+ val t = s"${catalogAndNamespace}table_name"
+ withTable("t") {
+ sql(s"create table $t (a string) using $v2Format")
+ sql(s"alter table $t add column (b int default 2 + 3)")
+
+ val tableName = fullTableName(t)
+ val table = getTableMetadata(tableName)
+
+ assert(table.name === tableName)
+ assert(table.schema === new StructType()
+ .add("a", StringType)
+ .add(StructField("b", IntegerType)
+ .withCurrentDefaultValue("2 + 3")
+ .withExistenceDefaultValue("5")))
+ }
+ }
+ }
+
test("AlterTable: add complex column") {
val t = s"${catalogAndNamespace}table_name"
withTable(t) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index d0cb8141ad7..835566238c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -110,7 +110,7 @@ class InMemoryTableSessionCatalog extends TestV2SessionCatalogBase[InMemoryTable
Option(tables.get(ident)) match {
case Some(table) =>
val properties = CatalogV2Util.applyPropertiesChanges(table.properties, changes)
- val schema = CatalogV2Util.applySchemaChanges(table.schema, changes)
+ val schema = CatalogV2Util.applySchemaChanges(table.schema, changes, None, "ALTER TABLE")
// fail if the last column in the schema was dropped
if (schema.fields.isEmpty) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org