You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2023/03/02 04:34:54 UTC

[spark] branch branch-3.4 updated: [SPARK-42521][SQL] Add NULLs for INSERTs with user-specified lists of fewer columns than the target table

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d8fa508e530 [SPARK-42521][SQL] Add NULLs for INSERTs with user-specified lists of fewer columns than the target table
d8fa508e530 is described below

commit d8fa508e53042dfdbfef49a7b9c3b25a82c68677
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Wed Mar 1 20:33:53 2023 -0800

    [SPARK-42521][SQL] Add NULLs for INSERTs with user-specified lists of fewer columns than the target table
    
    ### What changes were proposed in this pull request?
    
    Add NULLs for INSERTs with user-specified lists of fewer columns than the target table.
    
    This is done by updating the semantics of the `USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES` SQLConf to only apply for INSERTs with explicit user-specific column lists, and changing it to true by default.
    
    ### Why are the changes needed?
    
    This behavior is consistent with other query engines.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, per above.
    
    ### How was this patch tested?
    
    Unit test coverage in `InsertSuite`.
    
    Closes #40229 from dtenedor/defaults-insert-nulls.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
    (cherry picked from commit d2a527a545c57c7fe1a736016b4e26666d2e571b)
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 docs/sql-migration-guide.md                        |   1 +
 .../catalyst/analysis/ResolveDefaultColumns.scala  |  35 ++---
 .../org/apache/spark/sql/internal/SQLConf.scala    |   9 +-
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |   4 +-
 .../spark/sql/connector/DataSourceV2SQLSuite.scala |  16 +--
 .../org/apache/spark/sql/sources/InsertSuite.scala | 142 ++++++++-------------
 6 files changed, 85 insertions(+), 122 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index 7eda9c8de92..0181ff95cd6 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -24,6 +24,7 @@ license: |
 
 ## Upgrading from Spark SQL 3.3 to 3.4
   
+  - Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed returning errors reporting that the number of provided columns does not match the number of columns in the target table. Note that disabling `spark.sql.defaultColumn [...]
   - Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.
   - Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`.
   - Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index a04844c6526..6afb51ba81e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
       val regenerated: InsertIntoStatement =
         regenerateUserSpecifiedCols(i, schema)
       val expanded: LogicalPlan =
-        addMissingDefaultValuesForInsertFromInlineTable(node, schema)
+        addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.size)
       val replaced: Option[LogicalPlan] =
         replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
       replaced.map { r: LogicalPlan =>
@@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
       val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema)
       val project: Project = i.query.asInstanceOf[Project]
       val expanded: Project =
-        addMissingDefaultValuesForInsertFromProject(project, schema)
+        addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.size)
       val replaced: Option[LogicalPlan] =
         replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
       replaced.map { r =>
@@ -273,15 +273,16 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    */
   private def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
-      insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = {
-    val numQueryOutputs: Int = node match {
-      case table: UnresolvedInlineTable => table.rows(0).size
-      case local: LocalRelation => local.data(0).numFields
-    }
+      insertTableSchemaWithoutPartitionColumns: StructType,
+      numUserSpecifiedColumns: Int): LogicalPlan = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(numQueryOutputs, schema)
-    val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name }
+      getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
+    val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) {
+      schema.fields.drop(numUserSpecifiedColumns).map(_.name)
+    } else {
+      schema.fields.map(_.name)
+    }
     node match {
       case _ if newDefaultExpressions.isEmpty => node
       case table: UnresolvedInlineTable =>
@@ -306,11 +307,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    */
   private def addMissingDefaultValuesForInsertFromProject(
       project: Project,
-      insertTableSchemaWithoutPartitionColumns: StructType): Project = {
-    val numQueryOutputs: Int = project.projectList.size
+      insertTableSchemaWithoutPartitionColumns: StructType,
+      numUserSpecifiedColumns: Int): Project = {
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(numQueryOutputs, schema)
+      getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns)
     val newAliases: Seq[NamedExpression] =
       newDefaultExpressions.zip(schema.fields).map {
         case (expr, field) => Alias(expr, field.name)()
@@ -322,9 +323,13 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    * This is a helper for the addMissingDefaultValuesForInsertFromInlineTable methods above.
    */
   private def getDefaultExpressionsForInsert(
-      numQueryOutputs: Int,
-      schema: StructType): Seq[Expression] = {
-    val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
+      schema: StructType,
+      numUserSpecifiedColumns: Int): Seq[Expression] = {
+    val remainingFields: Seq[StructField] = if (numUserSpecifiedColumns > 0) {
+      schema.fields.drop(numUserSpecifiedColumns)
+    } else {
+      Seq.empty
+    }
     val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size
     Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
   }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 2328817a2d4..21bf571c494 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -3115,13 +3115,12 @@ object SQLConf {
   val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
     buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
       .internal()
-      .doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " +
-        "explicit default values to behave as if they had specified DEFAULT NULL instead. " +
-        "For example, this allows most INSERT INTO statements to specify only a prefix of the " +
-        "columns in the target table, and the remaining columns will receive NULL values.")
+      .doc("When true, and DEFAULT columns are enabled, allow INSERT INTO commands with user-" +
+        "specified lists of fewer columns than the target table to behave as if they had " +
+        "specified DEFAULT for all remaining columns instead, in order.")
       .version("3.4.0")
       .booleanConf
-      .createWithDefault(false)
+      .createWithDefault(true)
 
   val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION =
     buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 24ebfd75bd5..1997fce0f5c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -207,7 +207,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
 
     withTable("t1") {
       createTable("t1", cols, Seq.fill(4)("int"))
-      val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
+      val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 values(1)"))
       assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
         e1.getMessage.contains("expected 4 columns but found 1") ||
         e1.getMessage.contains("not enough data columns") ||
@@ -217,7 +217,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     withTable("t1") {
       createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
       val e1 = intercept[AnalysisException] {
-        sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
+        sql(s"INSERT INTO t1 partition(c3=3, c4=4) values(1)")
       }
       assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
         e1.getMessage.contains("not enough data columns") ||
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 73bfeafdcdf..5b7acda6fc3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -1093,8 +1093,8 @@ class DataSourceV2SQLSuiteV1Filter
       verifyTable(t1, df)
       // Missing columns
       assert(intercept[AnalysisException] {
-        sql(s"INSERT INTO $t1(data) VALUES(4)")
-      }.getMessage.contains("Cannot find data for output column 'id'"))
+        sql(s"INSERT INTO $t1 VALUES(4)")
+      }.getMessage.contains("not enough data columns"))
       // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
@@ -1121,9 +1121,9 @@ class DataSourceV2SQLSuiteV1Filter
       verifyTable(t1, Seq((3L, "c")).toDF("id", "data"))
       // Missing columns
       assert(intercept[AnalysisException] {
-        sql(s"INSERT OVERWRITE $t1(data) VALUES(4)")
-      }.getMessage.contains("Cannot find data for output column 'id'"))
-      // Duplicate columns
+        sql(s"INSERT OVERWRITE $t1 VALUES(4)")
+      }.getMessage.contains("not enough data columns"))
+    // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
           sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")
@@ -1150,9 +1150,9 @@ class DataSourceV2SQLSuiteV1Filter
       verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2"))
       // Missing columns
       assert(intercept[AnalysisException] {
-        sql(s"INSERT OVERWRITE $t1(data, id) VALUES('a', 4)")
-      }.getMessage.contains("Cannot find data for output column 'data2'"))
-      // Duplicate columns
+        sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)")
+      }.getMessage.contains("not enough data columns"))
+    // Duplicate columns
       checkError(
         exception = intercept[AnalysisException] {
           sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index cc1d4ab3fcd..c24b77f014c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -864,7 +864,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
 
   test("Allow user to insert specified columns into insertable view") {
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
-      sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+      sql("INSERT OVERWRITE TABLE jsonTable SELECT a, DEFAULT FROM jt")
       checkAnswer(
         sql("SELECT a, b FROM jsonTable"),
         (1 to 10).map(i => Row(i, null))
@@ -884,7 +884,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
 
     val message = intercept[AnalysisException] {
-      sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
+      sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
     }.getMessage
     assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
   }
@@ -896,7 +896,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       withTable("t") {
         sql("create table t(i boolean, s bigint) using parquet")
-        sql("insert into t values(true)")
+        sql("insert into t(i) values(true)")
         checkAnswer(spark.table("t"), Row(true, null))
       }
     }
@@ -915,13 +915,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // The default value parses correctly and the provided value type is different but coercible.
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t values(false)")
+      sql("insert into t(i) values(false)")
       checkAnswer(spark.table("t"), Row(false, 42L))
     }
     // There are two trailing default values referenced implicitly by the INSERT INTO statement.
     withTable("t") {
       sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet")
-      sql("insert into t values(1)")
+      sql("insert into t(i) values(1)")
       checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i)))
     }
     // The table has a partitioning column and a default value is injected.
@@ -933,7 +933,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // The table has a partitioning column and a default value is added per an explicit reference.
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet partitioned by (i)")
-      sql("insert into t partition(i='true') values(default)")
+      sql("insert into t partition(i='true') (s) values(default)")
       checkAnswer(spark.table("t"), Row(42L, true))
     }
     // The default value parses correctly as a constant but non-literal expression.
@@ -946,7 +946,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // to the INSERT INTO statement.
     withTable("t") {
       sql("create table t(i boolean default false, s bigint default 42) using parquet")
-      sql("insert into t values(false, default), (default, 42)")
+      sql("insert into t(i, s) values(false, default), (default, 42)")
       checkAnswer(spark.table("t"), Seq(Row(false, 42L), Row(false, 42L)))
     }
     // There is an explicit default value provided in the INSERT INTO statement in the VALUES,
@@ -1003,31 +1003,31 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         withTable("t1", "t2") {
           sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet")
           if (useDataFrames) {
-            Seq((1)).toDF.write.insertInto("t1")
-            Seq((2)).toDF.write.insertInto("t1")
-            Seq((3)).toDF.write.insertInto("t1")
-            Seq((4, 44)).toDF.write.insertInto("t1")
-            Seq((5, 44, 45)).toDF.write.insertInto("t1")
+            Seq((1, 42, 43)).toDF.write.insertInto("t1")
+            Seq((2, 42, 43)).toDF.write.insertInto("t1")
+            Seq((3, 42, 43)).toDF.write.insertInto("t1")
+            Seq((4, 44, 43)).toDF.write.insertInto("t1")
+            Seq((5, 44, 43)).toDF.write.insertInto("t1")
           } else {
-            sql("insert into t1 values(1)")
-            sql("insert into t1 values(2, default)")
-            sql("insert into t1 values(3, default, default)")
-            sql("insert into t1 values(4, 44)")
-            sql("insert into t1 values(5, 44, 45)")
+            sql("insert into t1(j) values(1)")
+            sql("insert into t1(j, s) values(2, default)")
+            sql("insert into t1(j, s, x) values(3, default, default)")
+            sql("insert into t1(j, s) values(4, 44)")
+            sql("insert into t1(j, s, x) values(5, 44, 45)")
           }
           sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet")
           if (useDataFrames) {
-            spark.table("t1").where("j = 1").select("j").write.insertInto("t2")
-            spark.table("t1").where("j = 2").select("j").write.insertInto("t2")
-            spark.table("t1").where("j = 3").select("j").write.insertInto("t2")
-            spark.table("t1").where("j = 4").select("j", "s").write.insertInto("t2")
-            spark.table("t1").where("j = 5").select("j", "s").write.insertInto("t2")
+            spark.table("t1").where("j = 1").write.insertInto("t2")
+            spark.table("t1").where("j = 2").write.insertInto("t2")
+            spark.table("t1").where("j = 3").write.insertInto("t2")
+            spark.table("t1").where("j = 4").write.insertInto("t2")
+            spark.table("t1").where("j = 5").write.insertInto("t2")
           } else {
-            sql("insert into t2 select j from t1 where j = 1")
-            sql("insert into t2 select j, default from t1 where j = 2")
-            sql("insert into t2 select j, default, default from t1 where j = 3")
-            sql("insert into t2 select j, s from t1 where j = 4")
-            sql("insert into t2 select j, s, default from t1 where j = 5")
+            sql("insert into t2(j) select j from t1 where j = 1")
+            sql("insert into t2(j, s) select j, default from t1 where j = 2")
+            sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3")
+            sql("insert into t2(j, s) select j, s from t1 where j = 4")
+            sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5")
           }
           checkAnswer(
             spark.table("t2"),
@@ -1128,7 +1128,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql("create table t(i int, s bigint default 42, x bigint) using parquet")
       assert(intercept[AnalysisException] {
         sql("insert into t values(1)")
-      }.getMessage.contains("expected 3 columns but found"))
+      }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
     }
     // The table has a partitioning column with a default value; this is not allowed.
     withTable("t") {
@@ -1185,15 +1185,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
             Row(4, 43, false),
             Row(4, 42, false)))
     }
-    // When the CASE_SENSITIVE configuration is disabled, then using different cases for the
-    // required and provided column names is successful.
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
-      withTable("t") {
-        sql("create table t(i boolean, s bigint default 42, q int default 43) using parquet")
-        sql("insert into t (I, Q) select true from (select 1)")
-        checkAnswer(spark.table("t"), Row(true, 42L, 43))
-      }
-    }
     // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
     // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
     // values than expected, NULL values are appended in their place.
@@ -1230,29 +1221,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     val addOneColButExpectedTwo = "target table has 2 column(s) but the inserted data has 1 col"
     val addTwoColButExpectedThree = "target table has 3 column(s) but the inserted data has 2 col"
     // The missing columns in these INSERT INTO commands do not have explicit default values.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t (i) values (true)")
-      }.getMessage.contains(addOneColButExpectedTwo))
-    }
-    withTable("t") {
-      sql("create table t(i boolean default true, s bigint) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t (i) values (default)")
-      }.getMessage.contains(addOneColButExpectedTwo))
-    }
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t (s) values (default)")
-      }.getMessage.contains(addOneColButExpectedTwo))
-    }
     withTable("t") {
       sql("create table t(i boolean, s bigint, q int default 43) using parquet")
       assert(intercept[AnalysisException] {
         sql("insert into t (i, q) select true from (select 1)")
-      }.getMessage.contains(addTwoColButExpectedThree))
+      }.getMessage.contains("Cannot write to table due to mismatched user specified column " +
+        "size(3) and data column size(2)"))
     }
     // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
     // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
@@ -1327,14 +1301,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql(createTableIntCol)
       sql("alter table t add column s bigint default 42")
       sql("alter table t add column x bigint default 43")
-      sql("insert into t values(1)")
+      sql("insert into t(i) values(1)")
       checkAnswer(spark.table("t"), Row(1, 42, 43))
     }
     // There are two trailing default values referenced implicitly by the INSERT INTO statement.
     withTable("t") {
       sql(createTableIntCol)
       sql("alter table t add columns s bigint default 42, x bigint default 43")
-      sql("insert into t values(1)")
+      sql("insert into t(i) values(1)")
       checkAnswer(spark.table("t"), Row(1, 42, 43))
     }
     // The table has a partitioning column and a default value is injected.
@@ -1348,8 +1322,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withTable("t") {
       sql(createTableBooleanCol)
       sql("alter table t add column s bigint default 41 + 1")
-      sql("insert into t values(false, default)")
-      checkAnswer(spark.table("t"), Row(false, 42))
+      sql("insert into t(i) values(default)")
+      checkAnswer(spark.table("t"), Row(null, 42))
     }
     // Explicit defaults may appear in different positions within the inline table provided as input
     // to the INSERT INTO statement.
@@ -1399,18 +1373,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         sql("create table t1(j int) using parquet")
         sql("alter table t1 add column s bigint default 42")
         sql("alter table t1 add column x bigint default 43")
-        sql("insert into t1 values(1)")
-        sql("insert into t1 values(2, default)")
-        sql("insert into t1 values(3, default, default)")
-        sql("insert into t1 values(4, 44)")
-        sql("insert into t1 values(5, 44, 45)")
+        sql("insert into t1(j) values(1)")
+        sql("insert into t1(j, s) values(2, default)")
+        sql("insert into t1(j, s, x) values(3, default, default)")
+        sql("insert into t1(j, s) values(4, 44)")
+        sql("insert into t1(j, s, x) values(5, 44, 45)")
         sql("create table t2(j int) using parquet")
         sql("alter table t2 add columns s bigint default 42, x bigint default 43")
-        sql("insert into t2 select j from t1 where j = 1")
-        sql("insert into t2 select j, default from t1 where j = 2")
-        sql("insert into t2 select j, default, default from t1 where j = 3")
-        sql("insert into t2 select j, s from t1 where j = 4")
-        sql("insert into t2 select j, s, default from t1 where j = 5")
+        sql("insert into t2(j) select j from t1 where j = 1")
+        sql("insert into t2(j, s) select j, default from t1 where j = 2")
+        sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3")
+        sql("insert into t2(j, s) select j, s from t1 where j = 4")
+        sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5")
         checkAnswer(
           spark.table("t2"),
           Row(1, 42L, 43L) ::
@@ -1472,7 +1446,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql("alter table t add column x bigint")
       assert(intercept[AnalysisException] {
         sql("insert into t values(1)")
-      }.getMessage.contains("expected 3 columns but found"))
+      }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
     }
   }
 
@@ -1555,11 +1529,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         useDataFrames: Boolean = false)
     def runTest(dataSource: String, config: Config): Unit = {
       def insertIntoT(): Unit = {
-        if (config.useDataFrames) {
-          Seq(("xyz", 42)).toDF.write.insertInto("t")
-        } else {
-          sql("insert into t values('xyz', 42)")
-        }
+        sql("insert into t(a, i) values('xyz', 42)")
       }
       def withTableT(f: => Unit): Unit = {
         sql(s"create table t(a string, i int) using $dataSource")
@@ -1897,11 +1867,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
                 array(
                   map(false, 'def', true, 'jkl'))))
               using ${config.dataSource}""")
-        if (config.useDataFrames) {
-          Seq((1)).toDF.write.insertInto("t")
-        } else {
-          sql("insert into t select 1, default")
-        }
+        sql("insert into t select 1, default")
         sql("alter table t alter column s drop default")
         if (config.useDataFrames) {
           Seq((2, null)).toDF.write.insertInto("t")
@@ -1916,11 +1882,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
                 struct(3, 4)),
               array(
                 map(false, 'mno', true, 'pqr')))""")
-        if (config.useDataFrames) {
-          Seq((3)).toDF.write.insertInto("t")
-        } else {
-          sql("insert into t select 3, default")
-        }
+        sql("insert into t select 3, default")
         sql(
           """
             alter table t
@@ -1928,11 +1890,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
               map<boolean, string>>
             default array(
               map(true, 'xyz'))""")
-        if (config.useDataFrames) {
-          Seq((4)).toDF.write.insertInto("t")
-        } else {
-          sql("insert into t select 4, default")
-        }
+        sql("insert into t(i, s) select 4, default")
         checkAnswer(spark.table("t"),
           Seq(
             Row(1,
@@ -2275,7 +2233,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       withTable("t1") {
         sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet")
-        sql("INSERT INTO TABLE t1 select * from jt where a=1")
+        sql("INSERT INTO TABLE t1(c1, c2) select * from jt where a=1")
         checkAnswer(spark.table("t1"), Row(1, "str1", null))
         sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2")
         checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, "str2", 2)))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org