You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ge...@apache.org on 2022/08/18 21:38:04 UTC

[spark] branch master updated: Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"

This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 50c163578cf Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"
50c163578cf is described below

commit 50c163578cfef79002fbdbc54b3b8fc10cfbcf65
Author: Daniel Tenedorio <da...@databricks.com>
AuthorDate: Thu Aug 18 14:37:38 2022 -0700

    Revert "[SPARK-40000][SQL] Update INSERTs without user-specified fields to not automatically add default values"
    
    ### What changes were proposed in this pull request?
    
    Revert PR 37430 in commit 13c1b594a8ee6b99544572864b378b3616ffdb58 (Update INSERTs without user-specified fields to not automatically add default values).
    
    This is a clean revert, undoing the original changes from that PR exactly.
    
    ### Why are the changes needed?
    
    Upon further review, we find that the ability to switch between the new configuration and the old one is not needed and adds complexity.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit test coverage.1
    
    Closes #37572 from dtenedor/revert-pr37430.
    
    Authored-by: Daniel Tenedorio <da...@databricks.com>
    Signed-off-by: Gengliang Wang <ge...@apache.org>
---
 docs/sql-migration-guide.md                        |   1 -
 .../catalyst/analysis/ResolveDefaultColumns.scala  |  42 ++--
 .../org/apache/spark/sql/internal/SQLConf.scala    |  29 ++-
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  |  57 +++--
 .../org/apache/spark/sql/sources/InsertSuite.scala | 231 ++++++++++-----------
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  12 +-
 6 files changed, 178 insertions(+), 194 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index c19069cfdba..42df05f7f70 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -26,7 +26,6 @@ license: |
   
   - Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed.
   - Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`.
-  - Since Spark 3.4, `INSERT INTO` commands will now support user-specified column lists comprising fewer columns than present in the target table (for example, `INSERT INTO t (a, b) VALUES (1, 2)` where table `t` has three columns). In this case, Spark will insert `NULL` into the remaining columns in the row, or the explicit `DEFAULT` value if assigned to the column. To revert to the previous behavior, please set `spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns` to false.
   - Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error.
 
 ## Upgrading from Spark SQL 3.2 to 3.3
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
index 20ca3c9532a..b7c7f0d3772 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala
@@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
       val regenerated: InsertIntoStatement =
         regenerateUserSpecifiedCols(i, schema)
       val expanded: LogicalPlan =
-        addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.length)
+        addMissingDefaultValuesForInsertFromInlineTable(node, schema)
       val replaced: Option[LogicalPlan] =
         replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
       replaced.map { r: LogicalPlan =>
@@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
       val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema)
       val project: Project = i.query.asInstanceOf[Project]
       val expanded: Project =
-        addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.length)
+        addMissingDefaultValuesForInsertFromProject(project, schema)
       val replaced: Option[LogicalPlan] =
         replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded)
       replaced.map { r =>
@@ -265,15 +265,14 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    */
   private def addMissingDefaultValuesForInsertFromInlineTable(
       node: LogicalPlan,
-      insertTableSchemaWithoutPartitionColumns: StructType,
-      numUserSpecifiedFields: Int): LogicalPlan = {
+      insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = {
     val numQueryOutputs: Int = node match {
       case table: UnresolvedInlineTable => table.rows(0).size
       case local: LocalRelation => local.data(0).numFields
     }
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, node)
+      getDefaultExpressionsForInsert(numQueryOutputs, schema)
     val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name }
     node match {
       case _ if newDefaultExpressions.isEmpty => node
@@ -299,12 +298,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    */
   private def addMissingDefaultValuesForInsertFromProject(
       project: Project,
-      insertTableSchemaWithoutPartitionColumns: StructType,
-      numUserSpecifiedFields: Int): Project = {
+      insertTableSchemaWithoutPartitionColumns: StructType): Project = {
     val numQueryOutputs: Int = project.projectList.size
     val schema = insertTableSchemaWithoutPartitionColumns
     val newDefaultExpressions: Seq[Expression] =
-      getDefaultExpressionsForInsert(numQueryOutputs, schema, numUserSpecifiedFields, project)
+      getDefaultExpressionsForInsert(numQueryOutputs, schema)
     val newAliases: Seq[NamedExpression] =
       newDefaultExpressions.zip(schema.fields).map {
         case (expr, field) => Alias(expr, field.name)()
@@ -317,19 +315,20 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
    */
   private def getDefaultExpressionsForInsert(
       numQueryOutputs: Int,
-      schema: StructType,
-      numUserSpecifiedFields: Int,
-      treeNode: LogicalPlan): Seq[Expression] = {
-    if (numUserSpecifiedFields > 0 && numUserSpecifiedFields != numQueryOutputs) {
-      throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
-        numUserSpecifiedFields, numQueryOutputs, treeNode)
-    }
-    if (numUserSpecifiedFields > 0 && SQLConf.get.addMissingValuesForInsertsWithExplicitColumns) {
-      val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
-      val numDefaultExpressionsToAdd = remainingFields.size
-      Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+      schema: StructType): Seq[Expression] = {
+    val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs)
+    val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size
+    Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME))
+  }
+
+  /**
+   * This is a helper for the getDefaultExpressionsForInsert methods above.
+   */
+  private def getStructFieldsForDefaultExpressions(fields: Seq[StructField]): Seq[StructField] = {
+    if (SQLConf.get.useNullsForMissingDefaultColumnValues) {
+      fields
     } else {
-      Seq.empty[Expression]
+      fields.takeWhile(_.metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY))
     }
   }
 
@@ -488,7 +487,8 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl
       schema.fields.filter {
         field => !userSpecifiedColNames.contains(field.name)
       }
-    Some(StructType(userSpecifiedFields ++ nonUserSpecifiedFields))
+    Some(StructType(userSpecifiedFields ++
+      getStructFieldsForDefaultExpressions(nonUserSpecifiedFields)))
   }
 
   /**
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index cd0c0d25053..3ce6ee47958 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2924,18 +2924,6 @@ object SQLConf {
       .stringConf
       .createWithDefault("csv,json,orc,parquet")
 
-  val ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS =
-    buildConf("spark.sql.defaultColumn.addMissingValuesForInsertsWithExplicitColumns")
-      .internal()
-      .doc("When true, allow INSERT INTO commands with explicit columns (such as " +
-        "INSERT INTO t(a, b)) to specify fewer columns than the target table; the analyzer will " +
-        "assign default values for remaining columns (either NULL, or otherwise the explicit " +
-        "DEFAULT value associated with the column from a previous command). Otherwise, if " +
-        "false, return an error.")
-      .version("3.4.0")
-      .booleanConf
-      .createWithDefault(true)
-
   val JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE =
     buildConf("spark.sql.jsonGenerator.writeNullIfWithDefaultValue")
       .internal()
@@ -2948,6 +2936,17 @@ object SQLConf {
       .booleanConf
       .createWithDefault(true)
 
+  val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES =
+    buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues")
+      .internal()
+      .doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " +
+        "explicit default values to behave as if they had specified DEFAULT NULL instead. " +
+        "For example, this allows most INSERT INTO statements to specify only a prefix of the " +
+        "columns in the target table, and the remaining columns will receive NULL values.")
+      .version("3.4.0")
+      .booleanConf
+      .createWithDefault(false)
+
   val ENFORCE_RESERVED_KEYWORDS = buildConf("spark.sql.ansi.enforceReservedKeywords")
     .doc(s"When true and '${ANSI_ENABLED.key}' is true, the Spark SQL parser enforces the ANSI " +
       "reserved keywords and forbids SQL queries that use reserved keywords as alias names " +
@@ -4531,12 +4530,12 @@ class SQLConf extends Serializable with Logging {
 
   def defaultColumnAllowedProviders: String = getConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS)
 
-  def addMissingValuesForInsertsWithExplicitColumns: Boolean =
-    getConf(SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS)
-
   def jsonWriteNullIfWithDefaultValue: Boolean =
     getConf(JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE)
 
+  def useNullsForMissingDefaultColumnValues: Boolean =
+    getConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES)
+
   def enforceReservedKeywords: Boolean = ansiEnabled && getConf(ENFORCE_RESERVED_KEYWORDS)
 
   def timestampType: AtomicType = getConf(TIMESTAMP_TYPE) match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index 9d5f75bad38..7fd6a5dbea0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -175,48 +175,45 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
   test("insert with column list - mismatched column list size") {
     val msgs = Seq("Cannot write to table due to mismatched user specified column size",
       "expected 3 columns but found")
-    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false",
-      SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+    def test: Unit = {
       withTable("t1") {
         val cols = Seq("c1", "c2", "c3")
         createTable("t1", cols, Seq("int", "long", "string"))
-        Seq(
-          "INSERT INTO t1 (c1, c2) values(1, 2, 3)",
-          "INSERT INTO t1 (c1, c2) select 1, 2, 3",
-          "INSERT INTO t1 (c1, c2, c3) values(1, 2)",
-          "INSERT INTO t1 (c1, c2, c3) select 1, 2"
-        ).foreach { query =>
-          val e = intercept[AnalysisException](sql(query))
-          assert(e.getMessage.contains(msgs(0)) || e.getMessage.contains(msgs(1)))
-        }
+        val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2) values(1, 2, 3)"))
+        assert(e1.getMessage.contains(msgs(0)) || e1.getMessage.contains(msgs(1)))
+        val e2 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1, c2, c3) values(1, 2)"))
+        assert(e2.getMessage.contains(msgs(0)) || e2.getMessage.contains(msgs(1)))
       }
     }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      test
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+      test
+    }
   }
 
   test("insert with column list - mismatched target table out size after rewritten query") {
-    val v2Msg = "Cannot write to table due to mismatched user specified column size"
+    val v2Msg = "expected 2 columns but found"
     val cols = Seq("c1", "c2", "c3", "c4")
 
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") {
-      withTable("t1") {
-        createTable("t1", cols, Seq.fill(4)("int"))
-        val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
-        assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
-          e1.getMessage.contains("expected 4 columns but found 1") ||
-          e1.getMessage.contains("not enough data columns") ||
-          e1.getMessage.contains(v2Msg))
-      }
+    withTable("t1") {
+      createTable("t1", cols, Seq.fill(4)("int"))
+      val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)"))
+      assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") ||
+        e1.getMessage.contains("expected 4 columns but found 1") ||
+        e1.getMessage.contains("not enough data columns") ||
+        e1.getMessage.contains(v2Msg))
+    }
 
-      withTable("t1") {
-        createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
-        val e1 = intercept[AnalysisException] {
-          sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
-        }
-        assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
-          e1.getMessage.contains("not enough data columns") ||
-          e1.getMessage.contains(v2Msg))
+    withTable("t1") {
+      createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2))
+      val e1 = intercept[AnalysisException] {
+        sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)")
       }
+      assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") ||
+        e1.getMessage.contains("not enough data columns") ||
+        e1.getMessage.contains(v2Msg))
     }
   }
 
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index afe3ac6facc..3936f2b995c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -27,7 +27,6 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.expressions.Literal
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.datasources.DataSourceUtils
@@ -864,8 +863,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("Allow user to insert specified columns into insertable view") {
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+      sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+      checkAnswer(
+        sql("SELECT a, b FROM jsonTable"),
+        (1 to 10).map(i => Row(i, null))
+      )
+
       sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
       checkAnswer(
         sql("SELECT a, b FROM jsonTable"),
@@ -878,9 +882,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         (1 to 10).map(i => Row(null, s"str$i"))
       )
     }
+
+    val message = intercept[AnalysisException] {
+      sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
+    }.getMessage
+    assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
   }
 
   test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") {
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+    // values than expected, NULL values are appended in their place.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        sql("insert into t values(true)")
+        checkAnswer(spark.table("t"), Row(true, null))
+      }
+    }
     // The default value for the DEFAULT keyword is the NULL literal.
     withTable("t") {
       sql("create table t(i boolean, s bigint) using parquet")
@@ -896,13 +915,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     // The default value parses correctly and the provided value type is different but coercible.
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t (i) values(false)")
+      sql("insert into t values(false)")
       checkAnswer(spark.table("t"), Row(false, 42L))
     }
     // There are two trailing default values referenced implicitly by the INSERT INTO statement.
     withTable("t") {
       sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet")
-      sql("insert into t(i) values(1)")
+      sql("insert into t values(1)")
       checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i)))
     }
     // The table has a partitioning column and a default value is injected.
@@ -977,47 +996,37 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       checkAnswer(spark.table("t"), Row(false, 42L))
     }
     // There are three column types exercising various combinations of implicit and explicit
-    // default column value references in the 'insert into' statements.
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    // default column value references in the 'insert into' statements. Note these tests depend on
+    // enabling the configuration to use NULLs for missing DEFAULT column values.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       for (useDataFrames <- Seq(false, true)) {
         withTable("t1", "t2") {
           sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet")
           if (useDataFrames) {
-            Seq((1, 42, 43)).toDF.write.insertInto("t1")
-            Seq((2, 42, 43)).toDF.write.insertInto("t1")
-            Seq((3, 42, 43)).toDF.write.insertInto("t1")
-            Seq((4, 44, 43)).toDF.write.insertInto("t1")
+            Seq((1)).toDF.write.insertInto("t1")
+            Seq((2)).toDF.write.insertInto("t1")
+            Seq((3)).toDF.write.insertInto("t1")
+            Seq((4, 44)).toDF.write.insertInto("t1")
             Seq((5, 44, 45)).toDF.write.insertInto("t1")
           } else {
-            sql("insert into t1(j) values(1)")
-            sql("insert into t1(j, s) values(2, default)")
+            sql("insert into t1 values(1)")
+            sql("insert into t1 values(2, default)")
             sql("insert into t1 values(3, default, default)")
-            sql("insert into t1(j, s) values(4, 44)")
+            sql("insert into t1 values(4, 44)")
             sql("insert into t1 values(5, 44, 45)")
           }
           sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet")
           if (useDataFrames) {
-            spark.table("t1").where("j = 1").select("j")
-              .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
-              .write.insertInto("t2")
-            spark.table("t1").where("j = 2").select("j")
-              .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
-              .write.insertInto("t2")
-            spark.table("t1").where("j = 3").select("j")
-              .withColumns(Seq("s", "x"), Seq(Column(Literal(42)), Column(Literal(43))))
-              .write.insertInto("t2")
-            spark.table("t1").where("j = 4").select("j", "s")
-              .withColumn("x", Column(Literal(43)))
-              .write.insertInto("t2")
-            spark.table("t1").where("j = 5").select("j", "s")
-              .withColumn("x", Column(Literal(43)))
-              .write.insertInto("t2")
+            spark.table("t1").where("j = 1").select("j").write.insertInto("t2")
+            spark.table("t1").where("j = 2").select("j").write.insertInto("t2")
+            spark.table("t1").where("j = 3").select("j").write.insertInto("t2")
+            spark.table("t1").where("j = 4").select("j", "s").write.insertInto("t2")
+            spark.table("t1").where("j = 5").select("j", "s").write.insertInto("t2")
           } else {
-            sql("insert into t2(j) select j from t1 where j = 1")
-            sql("insert into t2(j, s) select j, default from t1 where j = 2")
+            sql("insert into t2 select j from t1 where j = 1")
+            sql("insert into t2 select j, default from t1 where j = 2")
             sql("insert into t2 select j, default, default from t1 where j = 3")
-            sql("insert into t2(j, s) select j, s from t1 where j = 4")
+            sql("insert into t2 select j, s from t1 where j = 4")
             sql("insert into t2 select j, s, default from t1 where j = 5")
           }
           checkAnswer(
@@ -1119,21 +1128,20 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql("create table t(i int, s bigint default 42, x bigint) using parquet")
       assert(intercept[AnalysisException] {
         sql("insert into t values(1)")
-      }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
+      }.getMessage.contains("expected 3 columns but found"))
     }
     // The table has a partitioning column with a default value; this is not allowed.
     withTable("t") {
       sql("create table t(i boolean default true, s bigint, q int default 42) " +
         "using parquet partitioned by (i)")
       assert(intercept[ParseException] {
-        sql("insert into t partition(i=default) (s, q) values(5, default)")
+        sql("insert into t partition(i=default) values(5, default)")
       }.getMessage.contains(
         "References to DEFAULT column values are not allowed within the PARTITION clause"))
     }
-    // The INSERT INTO statement has no user specified columns and fewer values than the number of
-    // columns in the target table.
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    // The configuration option to append missing NULL values to the end of the INSERT INTO
+    // statement is not enabled.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
       withTable("t") {
         sql("create table t(i boolean, s bigint) using parquet")
         assert(intercept[AnalysisException] {
@@ -1171,16 +1179,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
             Row(4, 43, false),
             Row(4, 42, false)))
     }
-    // When the INSERT INTO statement has user specified columns, and no explicit DEFAULT value is
-    // available when the INSERT INTO statement provides fewer values than expected, NULL values are
-    // appended in their place.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet")
-      sql("insert into t (i) values (true)")
-      checkAnswer(spark.table("t"), Row(true, null))
+    // When the CASE_SENSITIVE configuration is disabled, then using different cases for the
+    // required and provided column names is successful.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint default 42, q int default 43) using parquet")
+        sql("insert into t (I, Q) select true from (select 1)")
+        checkAnswer(spark.table("t"), Row(true, 42L, 43))
+      }
     }
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+    // values than expected, NULL values are appended in their place.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        sql("insert into t (i) values (true)")
+        checkAnswer(spark.table("t"), Row(true, null))
+      }
       withTable("t") {
         sql("create table t(i boolean default true, s bigint) using parquet")
         sql("insert into t (i) values (default)")
@@ -1211,71 +1227,65 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     withTable("t") {
       sql("create table t(i boolean, s bigint) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (true)")
+        sql("insert into t (i) values (true)")
       }.getMessage.contains(addOneColButExpectedTwo))
     }
     withTable("t") {
       sql("create table t(i boolean default true, s bigint) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default)")
+        sql("insert into t (i) values (default)")
       }.getMessage.contains(addOneColButExpectedTwo))
     }
     withTable("t") {
       sql("create table t(i boolean, s bigint default 42) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t values (default)")
+        sql("insert into t (s) values (default)")
       }.getMessage.contains(addOneColButExpectedTwo))
     }
     withTable("t") {
       sql("create table t(i boolean, s bigint, q int default 43) using parquet")
       assert(intercept[AnalysisException] {
-        sql("insert into t select true from (select 1)")
-      }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
-    }
-    withTable("t") {
-      sql("create table t(i boolean default true, s bigint default 42) using parquet")
-      assert(intercept[AnalysisException] {
-        sql("insert into t values (default)")
-      }.getMessage.contains("target table has 2 column(s) but the inserted data has 1 column(s)"))
+        sql("insert into t (i, q) select true from (select 1)")
+      }.getMessage.contains(addTwoColButExpectedThree))
     }
-    // When no explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
     // values than expected, the INSERT INTO command fails to execute.
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
       withTable("t") {
         sql("create table t(i boolean, s bigint) using parquet")
         assert(intercept[AnalysisException] {
-          sql("insert into t values (true)")
+          sql("insert into t (i) values (true)")
         }.getMessage.contains(addOneColButExpectedTwo))
       }
       withTable("t") {
         sql("create table t(i boolean default true, s bigint) using parquet")
         assert(intercept[AnalysisException] {
-          sql("insert into t values (default)")
+          sql("insert into t (i) values (default)")
         }.getMessage.contains(addOneColButExpectedTwo))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint default 42) using parquet")
         assert(intercept[AnalysisException] {
-          sql("insert into t values (default)")
+          sql("insert into t (s) values (default)")
         }.getMessage.contains(addOneColButExpectedTwo))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
         assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='true') values(5)")
+          sql("insert into t partition(i='true') (s) values(5)")
         }.getMessage.contains(addTwoColButExpectedThree))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
         assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='false') select 43")
+          sql("insert into t partition(i='false') (q) select 43")
         }.getMessage.contains(addTwoColButExpectedThree))
       }
       withTable("t") {
         sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
         assert(intercept[AnalysisException] {
-          sql("insert into t partition(i='false') select default")
+          sql("insert into t partition(i='false') (q) select default")
         }.getMessage.contains(addTwoColButExpectedThree))
       }
     }
@@ -1286,18 +1296,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         sql("create table t(i boolean default true, s bigint default 42) using parquet")
         assert(intercept[AnalysisException] {
           sql("insert into t (I) select true from (select 1)")
-        }.getMessage.contains("A column or function parameter with name `I` cannot be resolved"))
-      }
-    }
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "false") {
-      withTable("t") {
-        sql("create table t(i boolean, s bigint default 42) using parquet")
-        assert(intercept[AnalysisException] {
-          sql("insert into t(i) values (default)")
         }.getMessage.contains(
-          "Cannot write to table due to mismatched user specified column size(2) " +
-            "and data column size(1)"))
+          "[UNRESOLVED_COLUMN] A column or function parameter with name `I` cannot be resolved. " +
+            "Did you mean one of the following? [`i`, `s`]"))
       }
     }
   }
@@ -1317,14 +1318,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql(createTableIntCol)
       sql("alter table t add column s bigint default 42")
       sql("alter table t add column x bigint default 43")
-      sql("insert into t (i) values(1)")
+      sql("insert into t values(1)")
       checkAnswer(spark.table("t"), Row(1, 42, 43))
     }
     // There are two trailing default values referenced implicitly by the INSERT INTO statement.
     withTable("t") {
       sql(createTableIntCol)
       sql("alter table t add columns s bigint default 42, x bigint default 43")
-      sql("insert into t (i) values(1)")
+      sql("insert into t values(1)")
       checkAnswer(spark.table("t"), Row(1, 42, 43))
     }
     // The table has a partitioning column and a default value is injected.
@@ -1382,24 +1383,24 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       checkAnswer(spark.table("t"), Row(false, 1))
     }
     // There are three column types exercising various combinations of implicit and explicit
-    // default column value references in the 'insert into' statements.
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    // default column value references in the 'insert into' statements. Note these tests depend on
+    // enabling the configuration to use NULLs for missing DEFAULT column values.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       withTable("t1", "t2") {
         sql("create table t1(j int) using parquet")
         sql("alter table t1 add column s bigint default 42")
         sql("alter table t1 add column x bigint default 43")
-        sql("insert into t1(j) values(1)")
-        sql("insert into t1(j, s) values(2, default)")
+        sql("insert into t1 values(1)")
+        sql("insert into t1 values(2, default)")
         sql("insert into t1 values(3, default, default)")
-        sql("insert into t1(j, s) values(4, 44)")
+        sql("insert into t1 values(4, 44)")
         sql("insert into t1 values(5, 44, 45)")
         sql("create table t2(j int) using parquet")
         sql("alter table t2 add columns s bigint default 42, x bigint default 43")
-        sql("insert into t2(j) select j from t1 where j = 1")
-        sql("insert into t2(j, s) select j, default from t1 where j = 2")
+        sql("insert into t2 select j from t1 where j = 1")
+        sql("insert into t2 select j, default from t1 where j = 2")
         sql("insert into t2 select j, default, default from t1 where j = 3")
-        sql("insert into t2(j, s) select j, s from t1 where j = 4")
+        sql("insert into t2 select j, s from t1 where j = 4")
         sql("insert into t2 select j, s, default from t1 where j = 5")
         checkAnswer(
           spark.table("t2"),
@@ -1462,7 +1463,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       sql("alter table t add column x bigint")
       assert(intercept[AnalysisException] {
         sql("insert into t values(1)")
-      }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)"))
+      }.getMessage.contains("expected 3 columns but found"))
     }
   }
 
@@ -1538,13 +1539,16 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         insertNullsToStorage: Boolean = true,
         useDataFrames: Boolean = false)
     def runTest(dataSource: String, config: Config): Unit = {
-      def withTableT(f: => Unit): Unit = {
-        sql(s"create table t(a string, i int) using $dataSource")
+      def insertIntoT(): Unit = {
         if (config.useDataFrames) {
           Seq(("xyz", 42)).toDF.write.insertInto("t")
         } else {
-          sql("insert into t (a, i) values('xyz', 42)")
+          sql("insert into t values('xyz', 42)")
         }
+      }
+      def withTableT(f: => Unit): Unit = {
+        sql(s"create table t(a string, i int) using $dataSource")
+        insertIntoT
         withTable("t") { f }
       }
       // Positive tests:
@@ -1568,7 +1572,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         if (config.useDataFrames) {
           Seq((null, null, null)).toDF.write.insertInto("t")
         } else {
-          sql("insert into t (a, i, s) values(null, null, null)")
+          sql("insert into t values(null, null, null)")
         }
         sql("alter table t add column (x boolean default true)")
         // By default, INSERT commands into some tables (such as JSON) do not store NULL values.
@@ -1609,17 +1613,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
           "a3 bigint default 43," +
           "a4 smallint default cast(5 as smallint)," +
           "a5 tinyint default cast(6 as tinyint))")
-        if (config.useDataFrames) {
-          sql("select 'xyz', 42, true, cast(null as byte), cast(42 as short), 0, 0, " +
-            "cast('2021-01-02' as date), " +
-            "cast('2021-01-02 01:01:01' as timestamp), " +
-            "cast('2021-01-02 01:01:01' as timestamp_ntz), " +
-            "cast('2021-01-02 01:01:01' as timestamp_ltz), " +
-            "cast(123.45 as decimal(5, 2)), 43, cast(5 as smallint), cast(6 as tinyint)")
-            .write.insertInto("t")
-        } else {
-          sql("insert into t (a, i) values('xyz', 42)")
-        }
+        insertIntoT()
         // Manually inspect the result row values rather than using the 'checkAnswer' helper method
         // in order to ensure the values' correctness while avoiding minor type incompatibilities.
         val result: Array[Row] =
@@ -1891,15 +1885,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
                 array(
                   map(false, 'def', true, 'jkl'))))
               using ${config.dataSource}""")
-        def namedStructSql(colA: Integer, colB: Integer, key1: String, key2: String): String = {
-          "named_struct(" +
-            "'x', array(" +
-            s"named_struct('a', $colA, 'b', $colB)), " +
-            "'y', array(" +
-            s"map(false, '$key1', true, '$key2')))"
-        }
         if (config.useDataFrames) {
-          sql("select 1, " + namedStructSql(1, 2, "def", "jkl")).write.insertInto("t")
+          Seq((1)).toDF.write.insertInto("t")
         } else {
           sql("insert into t select 1, default")
         }
@@ -1918,7 +1905,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
               array(
                 map(false, 'mno', true, 'pqr')))""")
         if (config.useDataFrames) {
-          sql("select 3, " + namedStructSql(3, 4, "mno", "pqr")).write.insertInto("t")
+          Seq((3)).toDF.write.insertInto("t")
         } else {
           sql("insert into t select 3, default")
         }
@@ -1930,12 +1917,9 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
             default array(
               map(true, 'xyz'))""")
         if (config.useDataFrames) {
-          sql("select 4, " + namedStructSql(3, 4, "mno", "pqr") + "," +
-            "array(" +
-              "map(true, 'xyz'))")
-            .write.insertInto("t")
+          Seq((4)).toDF.write.insertInto("t")
         } else {
-          sql("insert into t(i, s) select 4, default")
+          sql("insert into t select 4, default")
         }
         checkAnswer(spark.table("t"),
           Seq(
@@ -2256,13 +2240,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
       checkAnswer(spark.table("t1"), Row(1, "str1"))
     }
 
-    withSQLConf(
-      SQLConf.ADD_MISSING_DEFAULT_COLUMN_VALUES_FOR_INSERTS_WITH_EXPLICIT_COLUMNS.key -> "true") {
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       withTable("t1") {
         sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet")
-        sql("INSERT INTO TABLE t1 (c1, c2) select * from jt where a=1")
+        sql("INSERT INTO TABLE t1 select * from jt where a=1")
         checkAnswer(spark.table("t1"), Row(1, "str1", null))
-        sql("INSERT INTO TABLE t1  (c1, c2, c3) select *, 2 from jt where a=2")
+        sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2")
         checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, "str2", 2)))
       }
     }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index cfcc6b6b6b4..a7148e9c921 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -906,9 +906,15 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
         }
       }
     }
-
-    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false",
-      SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true") {
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+      testDefaultColumn
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+      SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+      testDefaultColumn
+    }
+    withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "true",
+      SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {
       testDefaultColumn
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org