You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/08/18 21:38:04 UTC
[spark] branch master updated: Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"
This is an automated email from the ASF dual-hosted git repository.
gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 50c163578cf Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"
50c163578cf is described below
commit 50c163578cfef79002fbdbc54b3b8fc10cfbcf65
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Thu Aug 18 14:37:38 2022 -0700
Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"
### What changes were proposed in this pull request?
Revert PR 37430 in commit 13c1b594a8ee6b99544572864b378b3616ffdb58 (Update INSERTs without user-specified fields to not automatically add default values).
This is a clean revert, undoing the original changes from that PR exactly.
### Why are the changes needed?
Upon further review, we find that the ability to switch between the new configuration and the old one is not needed and adds complexity.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit test coverage.1
Closes #37572 from dtenedor/revert-pr37430.
Authored-by: Daniel Tenedorio <da...@databricks.com>
Signed-off-by: Gengliang Wang <ge...@apache.org>
---
docs/sql-migration-guide.md | 1 -
.../catalyst/analysis/ResolveDefaultColumns.scala | 42 ++--
.../org/apache/spark/sql/internal/SQLConf.scala | 29 ++-
.../org/apache/spark/sql/SQLInsertTestSuite.scala | 57 +++--
.../org/apache/spark/sql/sources/InsertSuite.scala | 231 ++++++++++-----------
.../org/apache/spark/sql/hive/InsertSuite.scala | 12 +-
6 files changed, 178 insertions(+), 194 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index c19069cfdba..42df05f7f70 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,7 +26,6 @@ license: |
- Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.
- Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`.
- - Since Spark 3.4, `INSERT INTO` commands will now support user-specified column lists comprising fewer columns than present in the target table (for example, `INSERT INTO t (a, b) VALUES (1, 2)` where table `t` has three columns). In this case, Spark will insert `NULL` into the remaining columns in the row, or the explicit `DEFAULT` value if assigned to the column. To revert to the previous behavior, please set `spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns` to false.
- Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error.
## Upgrading from Spark SQL 3.2 to 3.3
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index 20ca3c9532a..b7c7f0d3772 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement =
regenerateUserSpecifiedCols(i, schema)
val expanded: LogicalPlan =
- addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.length)
+ addMissingDefaultValuesForInsertFromInlineTable(node, schema)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r: LogicalPlan =>
@@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema)
val project: Project = i.query.asInstanceOf[Project]
val expanded: Project =
- addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.length)
+ addMissingDefaultValuesForInsertFromProject(project, schema)
val replaced: Option[LogicalPlan] =
replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
replaced.map { r =>
@@ -265,15 +265,14 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromInlineTable(
node: LogicalPlan,
- insertTableSchemaWithoutPartitionColumns: StructType,
- numUserSpecifiedFields: Int): LogicalPlan = {
+ insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = {
val numQueryOutputs: Int = node match {
case table: UnresolvedInlineTable => table.rows(0).size
case local: LocalRelation => local.data(0).numFields
}
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
- getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, node)
+ getDefaultExpressionsForInsert(numQueryOutputs, schema)
val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name }
node match {
case _ if newDefaultExpressions.isEmpty => node
@@ -299,12 +298,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def addMissingDefaultValuesForInsertFromProject(
project: Project,
- insertTableSchemaWithoutPartitionColumns: StructType,
- numUserSpecifiedFields: Int): Project = {
+ insertTableSchemaWithoutPartitionColumns: StructType): Project = {
val numQueryOutputs: Int = project.projectList.size
val schema = insertTableSchemaWithoutPartitionColumns
val newDefaultExpressions: Seq[Expression] =
- getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, project)
+ getDefaultExpressionsForInsert(numQueryOutputs, schema)
val newAliases: Seq[NamedExpression] =
newDefaultExpressions.zip(schema.fields).map {
case (expr, field) => Alias(expr, field.name)()
@@ -317,19 +315,20 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
*/
private def getDefaultExpressionsForInsert(
numQueryOutputs: Int,
- schema: StructType,
- numUserSpecifiedFields: Int,
- treeNode: LogicalPlan): Seq[Expression] = {
- if (numUserSpecifiedFields > 0 && numUserSpecifiedFields != numQueryOutputs) {
- throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
- numUserSpecifiedFields, numQueryOutputs, treeNode)
- }
- if (numUserSpecifiedFields > 0 && SQLConf.get.addMissingValuesForInsertsWithExplicitColumns) {
- val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
- val numDefaultExpressionsToAdd = remainingFields.size
- Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+ schema: StructType): Seq[Expression] = {
+ val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
+ val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size
+ Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+ }
+
+ /**
+ * This is a helper for the getDefaultExpressionsForInsert methods above.
+ */
+ private def getStructFieldsForDefaultExpressions(fields: Seq[StructField]): Seq[StructField] = {
+ if (SQLConf.get.useNullsForMissingDefaultColumnValues) {
+ fields
} else {
- Seq.empty[Expression]
+ fields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))
}
}
@@ -488,7 +487,8 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
schema.fields.filter {
field => !userSpecifiedColNames.contains(field.name)
}
- Some(StructType(userSpecifiedFields ++ nonUserSpecifiedFields))
+ Some(StructType(userSpecifiedFields ++
+ getStructFieldsForDefaultExpressions(nonUserSpecifiedFields)))
}
/**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cd0c0d25053..3ce6ee47958 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2924,18 +2924,6 @@ object SQLConf {
.stringConf
.createWithDefault("csv,json,orc,parquet")
- val ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS =
- buildConf("spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns")
- .internal()
- .doc("When true, allow INSERT INTO commands with explicit columns (such as " +
- "INSERT INTO t(a, b)) to specify fewer columns than the target table; the analyzer will " +
- "assign default values for remaining columns (either NULL, or otherwise the explicit " +
- "DEFAULT value associated with the column from a previous command). Otherwise, if " +
- "false, return an error.")
- .version("3.4.0")
- .booleanConf
- .createWithDefault(true)
-
val JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE =
buildConf("spark.sql.jsonGenerator.writeNullIfWithDefaultValue")
.internal()
@@ -2948,6 +2936,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
+ buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
+ .internal()
+ .doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " +
+ "explicit default values to behave as if they had specified DEFAULT NULL instead. " +
+ "For example, this allows most INSERT INTO statements to specify only a prefix of the " +
+ "columns in the target table, and the remaining columns will receive NULL values.")
+ .version("3.4.0")
+ .booleanConf
+ .createWithDefault(false)
+
val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords")
.doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " +
"reserved keywords and forbids SQL queries that use reserved keywords as alias names " +
@@ -4531,12 +4530,12 @@ class SQLConf extends Serializable with Logging {
def defaultColumnAllowedProviders: String = getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
- def addMissingValuesForInsertsWithExplicitColumns: Boolean =
- getConf(SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS)
-
def jsonWriteNullIfWithDefaultValue: Boolean =
getConf(JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE)
+ def useNullsForMissingDefaultColumnValues: Boolean =
+ getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)
+
def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)
def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 9d5f75bad38..7fd6a5dbea0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -175,48 +175,45 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
test("insert with column list - mismatched column list size") {
val msgs = Seq("Cannot write to table due to mismatched user specified column size",
"expected 3 columns but found")
- withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false",
- SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ def test: Unit = {
withTable("t1") {
val cols = Seq("c1", "c2", "c3")
createTable("t1", cols, Seq("int", "long", "string"))
- Seq(
- "INSERT INTO t1 (c1, c2) values(1, 2, 3)",
- "INSERT INTO t1 (c1, c2) select 1, 2, 3",
- "INSERT INTO t1 (c1, c2, c3) values(1, 2)",
- "INSERT INTO t1 (c1, c2, c3) select 1, 2"
- ).foreach { query =>
- val e = intercept[AnalysisException](sql(query))
- assert(e.getMessage.contains(msgs(0)) || e.getMessage.contains(msgs(1)))
- }
+ val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)"))
+ assert(e1.getMessage.contains(msgs(0)) || e1.getMessage.contains(msgs(1)))
+ val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)"))
+ assert(e2.getMessage.contains(msgs(0)) || e2.getMessage.contains(msgs(1)))
}
}
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ test
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ test
+ }
}
test("insert with column list - mismatched target table out size after rewritten query") {
- val v2Msg = "Cannot write to table due to mismatched user specified column size"
+ val v2Msg = "expected 2 columns but found"
val cols = Seq("c1", "c2", "c3", "c4")
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") {
- withTable("t1") {
- createTable("t1", cols, Seq.fill(4)("int"))
- val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
- assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
- e1.getMessage.contains("expected 4 columns but found 1") ||
- e1.getMessage.contains("not enough data columns") ||
- e1.getMessage.contains(v2Msg))
- }
+ withTable("t1") {
+ createTable("t1", cols, Seq.fill(4)("int"))
+ val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
+ assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
+ e1.getMessage.contains("expected 4 columns but found 1") ||
+ e1.getMessage.contains("not enough data columns") ||
+ e1.getMessage.contains(v2Msg))
+ }
- withTable("t1") {
- createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
- val e1 = intercept[AnalysisException] {
- sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
- }
- assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
- e1.getMessage.contains("not enough data columns") ||
- e1.getMessage.contains(v2Msg))
+ withTable("t1") {
+ createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
+ val e1 = intercept[AnalysisException] {
+ sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
}
+ assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
+ e1.getMessage.contains("not enough data columns") ||
+ e1.getMessage.contains(v2Msg))
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index afe3ac6facc..3936f2b995c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -27,7 +27,6 @@ import org.apache.spark.SparkException
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.DataSourceUtils
@@ -864,8 +863,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
}
test("Allow user to insert specified columns into insertable view") {
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+ sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+ checkAnswer(
+ sql("SELECT a, b FROM jsonTable"),
+ (1 to 10).map(i => Row(i, null))
+ )
+
sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
checkAnswer(
sql("SELECT a, b FROM jsonTable"),
@@ -878,9 +882,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
(1 to 10).map(i => Row(null, s"str$i"))
)
}
+
+ val message = intercept[AnalysisException] {
+ sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
+ }.getMessage
+ assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
}
test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") {
+ // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
+ // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+ // values than expected, NULL values are appended in their place.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ sql("insert into t values(true)")
+ checkAnswer(spark.table("t"), Row(true, null))
+ }
+ }
// The default value for the DEFAULT keyword is the NULL literal.
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
@@ -896,13 +915,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
// The default value parses correctly and the provided value type is different but coercible.
withTable("t") {
sql("create table t(i boolean, s bigint default 42) using parquet")
- sql("insert into t (i) values(false)")
+ sql("insert into t values(false)")
checkAnswer(spark.table("t"), Row(false, 42L))
}
// There are two trailing default values referenced implicitly by the INSERT INTO statement.
withTable("t") {
sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet")
- sql("insert into t(i) values(1)")
+ sql("insert into t values(1)")
checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i)))
}
// The table has a partitioning column and a default value is injected.
@@ -977,47 +996,37 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
checkAnswer(spark.table("t"), Row(false, 42L))
}
// There are three column types exercising various combinations of implicit and explicit
- // default column value references in the 'insert into' statements.
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ // default column value references in the 'insert into' statements. Note these tests depend on
+ // enabling the configuration to use NULLs for missing DEFAULT column values.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
for (useDataFrames <- Seq(false, true)) {
withTable("t1", "t2") {
sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet")
if (useDataFrames) {
- Seq((1, 42, 43)).toDF.write.insertInto("t1")
- Seq((2, 42, 43)).toDF.write.insertInto("t1")
- Seq((3, 42, 43)).toDF.write.insertInto("t1")
- Seq((4, 44, 43)).toDF.write.insertInto("t1")
+ Seq((1)).toDF.write.insertInto("t1")
+ Seq((2)).toDF.write.insertInto("t1")
+ Seq((3)).toDF.write.insertInto("t1")
+ Seq((4, 44)).toDF.write.insertInto("t1")
Seq((5, 44, 45)).toDF.write.insertInto("t1")
} else {
- sql("insert into t1(j) values(1)")
- sql("insert into t1(j, s) values(2, default)")
+ sql("insert into t1 values(1)")
+ sql("insert into t1 values(2, default)")
sql("insert into t1 values(3, default, default)")
- sql("insert into t1(j, s) values(4, 44)")
+ sql("insert into t1 values(4, 44)")
sql("insert into t1 values(5, 44, 45)")
}
sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet")
if (useDataFrames) {
- spark.table("t1").where("j = 1").select("j")
- .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
- .write.insertInto("t2")
- spark.table("t1").where("j = 2").select("j")
- .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
- .write.insertInto("t2")
- spark.table("t1").where("j = 3").select("j")
- .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
- .write.insertInto("t2")
- spark.table("t1").where("j = 4").select("j", "s")
- .withColumn("x", Column(Literal(43)))
- .write.insertInto("t2")
- spark.table("t1").where("j = 5").select("j", "s")
- .withColumn("x", Column(Literal(43)))
- .write.insertInto("t2")
+ spark.table("t1").where("j = 1").select("j").write.insertInto("t2")
+ spark.table("t1").where("j = 2").select("j").write.insertInto("t2")
+ spark.table("t1").where("j = 3").select("j").write.insertInto("t2")
+ spark.table("t1").where("j = 4").select("j", "s").write.insertInto("t2")
+ spark.table("t1").where("j = 5").select("j", "s").write.insertInto("t2")
} else {
- sql("insert into t2(j) select j from t1 where j = 1")
- sql("insert into t2(j, s) select j, default from t1 where j = 2")
+ sql("insert into t2 select j from t1 where j = 1")
+ sql("insert into t2 select j, default from t1 where j = 2")
sql("insert into t2 select j, default, default from t1 where j = 3")
- sql("insert into t2(j, s) select j, s from t1 where j = 4")
+ sql("insert into t2 select j, s from t1 where j = 4")
sql("insert into t2 select j, s, default from t1 where j = 5")
}
checkAnswer(
@@ -1119,21 +1128,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
sql("create table t(i int, s bigint default 42, x bigint) using parquet")
assert(intercept[AnalysisException] {
sql("insert into t values(1)")
- }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
+ }.getMessage.contains("expected 3 columns but found"))
}
// The table has a partitioning column with a default value; this is not allowed.
withTable("t") {
sql("create table t(i boolean default true, s bigint, q int default 42) " +
"using parquet partitioned by (i)")
assert(intercept[ParseException] {
- sql("insert into t partition(i=default) (s, q) values(5, default)")
+ sql("insert into t partition(i=default) values(5, default)")
}.getMessage.contains(
"References to DEFAULT column values are not allowed within the PARTITION clause"))
}
- // The INSERT INTO statement has no user specified columns and fewer values than the number of
- // columns in the target table.
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ // The configuration option to append missing NULL values to the end of the INSERT INTO
+ // statement is not enabled.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
assert(intercept[AnalysisException] {
@@ -1171,16 +1179,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
Row(4, 43, false),
Row(4, 42, false)))
}
- // When the INSERT INTO statement has user specified columns, and no explicit DEFAULT value is
- // available when the INSERT INTO statement provides fewer values than expected, NULL values are
- // appended in their place.
- withTable("t") {
- sql("create table t(i boolean, s bigint) using parquet")
- sql("insert into t (i) values (true)")
- checkAnswer(spark.table("t"), Row(true, null))
+ // When the CASE_SENSITIVE configuration is disabled, then using different cases for the
+ // required and provided column names is successful.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42, q int default 43) using parquet")
+ sql("insert into t (I, Q) select true from (select 1)")
+ checkAnswer(spark.table("t"), Row(true, 42L, 43))
+ }
}
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
+ // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+ // values than expected, NULL values are appended in their place.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ sql("insert into t (i) values (true)")
+ checkAnswer(spark.table("t"), Row(true, null))
+ }
withTable("t") {
sql("create table t(i boolean default true, s bigint) using parquet")
sql("insert into t (i) values (default)")
@@ -1211,71 +1227,65 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (true)")
+ sql("insert into t (i) values (true)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean default true, s bigint) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default)")
+ sql("insert into t (i) values (default)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean, s bigint default 42) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default)")
+ sql("insert into t (s) values (default)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean, s bigint, q int default 43) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t select true from (select 1)")
- }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
- }
- withTable("t") {
- sql("create table t(i boolean default true, s bigint default 42) using parquet")
- assert(intercept[AnalysisException] {
- sql("insert into t values (default)")
- }.getMessage.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
+ sql("insert into t (i, q) select true from (select 1)")
+ }.getMessage.contains(addTwoColButExpectedThree))
}
- // When no explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+ // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
+ // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
// values than expected, the INSERT INTO command fails to execute.
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
withTable("t") {
sql("create table t(i boolean, s bigint) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (true)")
+ sql("insert into t (i) values (true)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean default true, s bigint) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default)")
+ sql("insert into t (i) values (default)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean, s bigint default 42) using parquet")
assert(intercept[AnalysisException] {
- sql("insert into t values (default)")
+ sql("insert into t (s) values (default)")
}.getMessage.contains(addOneColButExpectedTwo))
}
withTable("t") {
sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
assert(intercept[AnalysisException] {
- sql("insert into t partition(i='true') values(5)")
+ sql("insert into t partition(i='true') (s) values(5)")
}.getMessage.contains(addTwoColButExpectedThree))
}
withTable("t") {
sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
assert(intercept[AnalysisException] {
- sql("insert into t partition(i='false') select 43")
+ sql("insert into t partition(i='false') (q) select 43")
}.getMessage.contains(addTwoColButExpectedThree))
}
withTable("t") {
sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
assert(intercept[AnalysisException] {
- sql("insert into t partition(i='false') select default")
+ sql("insert into t partition(i='false') (q) select default")
}.getMessage.contains(addTwoColButExpectedThree))
}
}
@@ -1286,18 +1296,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
sql("create table t(i boolean default true, s bigint default 42) using parquet")
assert(intercept[AnalysisException] {
sql("insert into t (I) select true from (select 1)")
- }.getMessage.contains("A column or function parameter with name `I` cannot be resolved"))
- }
- }
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") {
- withTable("t") {
- sql("create table t(i boolean, s bigint default 42) using parquet")
- assert(intercept[AnalysisException] {
- sql("insert into t(i) values (default)")
}.getMessage.contains(
- "Cannot write to table due to mismatched user specified column size(2) " +
- "and data column size(1)"))
+ "[UNRESOLVED_COLUMN] A column or function parameter with name `I` cannot be resolved. " +
+ "Did you mean one of the following? [`i`, `s`]"))
}
}
}
@@ -1317,14 +1318,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
sql(createTableIntCol)
sql("alter table t add column s bigint default 42")
sql("alter table t add column x bigint default 43")
- sql("insert into t (i) values(1)")
+ sql("insert into t values(1)")
checkAnswer(spark.table("t"), Row(1, 42, 43))
}
// There are two trailing default values referenced implicitly by the INSERT INTO statement.
withTable("t") {
sql(createTableIntCol)
sql("alter table t add columns s bigint default 42, x bigint default 43")
- sql("insert into t (i) values(1)")
+ sql("insert into t values(1)")
checkAnswer(spark.table("t"), Row(1, 42, 43))
}
// The table has a partitioning column and a default value is injected.
@@ -1382,24 +1383,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
checkAnswer(spark.table("t"), Row(false, 1))
}
// There are three column types exercising various combinations of implicit and explicit
- // default column value references in the 'insert into' statements.
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ // default column value references in the 'insert into' statements. Note these tests depend on
+ // enabling the configuration to use NULLs for missing DEFAULT column values.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
withTable("t1", "t2") {
sql("create table t1(j int) using parquet")
sql("alter table t1 add column s bigint default 42")
sql("alter table t1 add column x bigint default 43")
- sql("insert into t1(j) values(1)")
- sql("insert into t1(j, s) values(2, default)")
+ sql("insert into t1 values(1)")
+ sql("insert into t1 values(2, default)")
sql("insert into t1 values(3, default, default)")
- sql("insert into t1(j, s) values(4, 44)")
+ sql("insert into t1 values(4, 44)")
sql("insert into t1 values(5, 44, 45)")
sql("create table t2(j int) using parquet")
sql("alter table t2 add columns s bigint default 42, x bigint default 43")
- sql("insert into t2(j) select j from t1 where j = 1")
- sql("insert into t2(j, s) select j, default from t1 where j = 2")
+ sql("insert into t2 select j from t1 where j = 1")
+ sql("insert into t2 select j, default from t1 where j = 2")
sql("insert into t2 select j, default, default from t1 where j = 3")
- sql("insert into t2(j, s) select j, s from t1 where j = 4")
+ sql("insert into t2 select j, s from t1 where j = 4")
sql("insert into t2 select j, s, default from t1 where j = 5")
checkAnswer(
spark.table("t2"),
@@ -1462,7 +1463,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
sql("alter table t add column x bigint")
assert(intercept[AnalysisException] {
sql("insert into t values(1)")
- }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
+ }.getMessage.contains("expected 3 columns but found"))
}
}
@@ -1538,13 +1539,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
insertNullsToStorage: Boolean = true,
useDataFrames: Boolean = false)
def runTest(dataSource: String, config: Config): Unit = {
- def withTableT(f: => Unit): Unit = {
- sql(s"create table t(a string, i int) using $dataSource")
+ def insertIntoT(): Unit = {
if (config.useDataFrames) {
Seq(("xyz", 42)).toDF.write.insertInto("t")
} else {
- sql("insert into t (a, i) values('xyz', 42)")
+ sql("insert into t values('xyz', 42)")
}
+ }
+ def withTableT(f: => Unit): Unit = {
+ sql(s"create table t(a string, i int) using $dataSource")
+ insertIntoT
withTable("t") { f }
}
// Positive tests:
@@ -1568,7 +1572,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
if (config.useDataFrames) {
Seq((null, null, null)).toDF.write.insertInto("t")
} else {
- sql("insert into t (a, i, s) values(null, null, null)")
+ sql("insert into t values(null, null, null)")
}
sql("alter table t add column (x boolean default true)")
// By default, INSERT commands into some tables (such as JSON) do not store NULL values.
@@ -1609,17 +1613,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
"a3 bigint default 43," +
"a4 smallint default cast(5 as smallint)," +
"a5 tinyint default cast(6 as tinyint))")
- if (config.useDataFrames) {
- sql("select 'xyz', 42, true, cast(null as byte), cast(42 as short), 0, 0, " +
- "cast('2021-01-02' as date), " +
- "cast('2021-01-02 01:01:01' as timestamp), " +
- "cast('2021-01-02 01:01:01' as timestamp_ntz), " +
- "cast('2021-01-02 01:01:01' as timestamp_ltz), " +
- "cast(123.45 as decimal(5, 2)), 43, cast(5 as smallint), cast(6 as tinyint)")
- .write.insertInto("t")
- } else {
- sql("insert into t (a, i) values('xyz', 42)")
- }
+ insertIntoT()
// Manually inspect the result row values rather than using the 'checkAnswer' helper method
// in order to ensure the values' correctness while avoiding minor type incompatibilities.
val result: Array[Row] =
@@ -1891,15 +1885,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
array(
map(false, 'def', true, 'jkl'))))
using ${config.dataSource}""")
- def namedStructSql(colA: Integer, colB: Integer, key1: String, key2: String): String = {
- "named_struct(" +
- "'x', array(" +
- s"named_struct('a', $colA, 'b', $colB)), " +
- "'y', array(" +
- s"map(false, '$key1', true, '$key2')))"
- }
if (config.useDataFrames) {
- sql("select 1, " + namedStructSql(1, 2, "def", "jkl")).write.insertInto("t")
+ Seq((1)).toDF.write.insertInto("t")
} else {
sql("insert into t select 1, default")
}
@@ -1918,7 +1905,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
array(
map(false, 'mno', true, 'pqr')))""")
if (config.useDataFrames) {
- sql("select 3, " + namedStructSql(3, 4, "mno", "pqr")).write.insertInto("t")
+ Seq((3)).toDF.write.insertInto("t")
} else {
sql("insert into t select 3, default")
}
@@ -1930,12 +1917,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
default array(
map(true, 'xyz'))""")
if (config.useDataFrames) {
- sql("select 4, " + namedStructSql(3, 4, "mno", "pqr") + "," +
- "array(" +
- "map(true, 'xyz'))")
- .write.insertInto("t")
+ Seq((4)).toDF.write.insertInto("t")
} else {
- sql("insert into t(i, s) select 4, default")
+ sql("insert into t select 4, default")
}
checkAnswer(spark.table("t"),
Seq(
@@ -2256,13 +2240,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
checkAnswer(spark.table("t1"), Row(1, "str1"))
}
- withSQLConf(
- SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
withTable("t1") {
sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet")
- sql("INSERT INTO TABLE t1 (c1, c2) select * from jt where a=1")
+ sql("INSERT INTO TABLE t1 select * from jt where a=1")
checkAnswer(spark.table("t1"), Row(1, "str1", null))
- sql("INSERT INTO TABLE t1 (c1, c2, c3) select *, 2 from jt where a=2")
+ sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2")
checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, "str2", 2)))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index cfcc6b6b6b4..a7148e9c921 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -906,9 +906,15 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
}
}
}
-
- withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false",
- SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+ testDefaultColumn
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+ SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+ testDefaultColumn
+ }
+ withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+ SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
testDefaultColumn
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org