You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "Hisoka-X (via GitHub)" <gi...@apache.org> on 2023/09/04 12:48:01 UTC

[GitHub] [spark] Hisoka-X commented on a diff in pull request #42802: [SPARK-43752][SQL] Support default column value on DataSource V2

Hisoka-X commented on code in PR #42802:
URL: https://github.com/apache/spark/pull/42802#discussion_r1314885709


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/V2WriteAnalysisSuite.scala:
##########
@@ -477,21 +477,6 @@ abstract class V2WriteAnalysisSuiteBase extends AnalysisTest {
     )
   }
 
-  test("byName: missing optional columns cause failure and are identified by name") {

Review Comment:
   This test case wouldn't passed now cause we have default value now.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1758,169 +1095,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
   }
 
-  test("INSERT rows, ALTER TABLE ADD COLUMNS with DEFAULTs, then SELECT them") {
-    case class Config(
-        sqlConf: Option[(String, String)],
-        useDataFrames: Boolean = false)
-    def runTest(dataSource: String, config: Config): Unit = {
-      def insertIntoT(): Unit = {
-        sql("insert into t(a, i) values('xyz', 42)")
-      }
-      def withTableT(f: => Unit): Unit = {
-        sql(s"create table t(a string, i int) using $dataSource")
-        insertIntoT
-        withTable("t") { f }
-      }
-      // Positive tests:
-      // Adding a column with a valid default value into a table containing existing data works
-      // successfully. Querying data from the altered table returns the new value.
-      withTableT {
-        sql("alter table t add column (s string default concat('abc', 'def'))")
-        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef"))
-        checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
-        // Now alter the column to change the default value. This still returns the previous value,
-        // not the new value, since the behavior semantics are the same as if the first command had
-        // performed a backfill of the new default value in the existing rows.
-        sql("alter table t alter column s set default concat('ghi', 'jkl')")
-        checkAnswer(sql("select i, s from t"), Row(42, "abcdef"))
-      }
-      // Adding a column with a default value and then inserting explicit NULL values works.
-      // Querying data back from the table differentiates between the explicit NULL values and
-      // default values.
-      withTableT {
-        sql("alter table t add column (s string default concat('abc', 'def'))")
-        if (config.useDataFrames) {
-          Seq((null, null, null)).toDF.write.insertInto("t")
-        } else {
-          sql("insert into t values(null, null, null)")
-        }
-        sql("alter table t add column (x boolean default true)")
-        val insertedSColumn = null
-        checkAnswer(spark.table("t"),
-          Seq(
-            Row("xyz", 42, "abcdef", true),
-            Row(null, null, insertedSColumn, true)))
-        checkAnswer(sql("select i, s, x from t"),
-          Seq(
-            Row(42, "abcdef", true),
-            Row(null, insertedSColumn, true)))
-      }
-      // Adding two columns where only the first has a valid default value works successfully.
-      // Querying data from the altered table returns the default value as well as NULL for the
-      // second column.
-      withTableT {
-        sql("alter table t add column (s string default concat('abc', 'def'))")
-        sql("alter table t add column (x string)")
-        checkAnswer(spark.table("t"), Row("xyz", 42, "abcdef", null))
-        checkAnswer(sql("select i, s, x from t"), Row(42, "abcdef", null))
-      }
-      // Test other supported data types.
-      withTableT {
-        sql("alter table t add columns (" +
-          "s boolean default true, " +
-          "t byte default cast(null as byte), " +
-          "u short default cast(42 as short), " +
-          "v float default 0, " +
-          "w double default 0, " +
-          "x date default cast('2021-01-02' as date), " +
-          "y timestamp default cast('2021-01-02 01:01:01' as timestamp), " +
-          "z timestamp_ntz default cast('2021-01-02 01:01:01' as timestamp_ntz), " +
-          "a1 timestamp_ltz default cast('2021-01-02 01:01:01' as timestamp_ltz), " +
-          "a2 decimal(5, 2) default 123.45," +
-          "a3 bigint default 43," +
-          "a4 smallint default cast(5 as smallint)," +
-          "a5 tinyint default cast(6 as tinyint))")
-        insertIntoT()
-        // Manually inspect the result row values rather than using the 'checkAnswer' helper method
-        // in order to ensure the values' correctness while avoiding minor type incompatibilities.
-        val result: Array[Row] =
-          sql("select s, t, u, v, w, x, y, z, a1, a2, a3, a4, a5 from t").collect()
-        for (row <- result) {
-          assert(row.length == 13)
-          assert(row(0) == true)
-          assert(row(1) == null)
-          assert(row(2) == 42)
-          assert(row(3) == 0.0f)
-          assert(row(4) == 0.0d)
-          assert(row(5).toString == "2021-01-02")
-          assert(row(6).toString == "2021-01-02 01:01:01.0")
-          assert(row(7).toString.startsWith("2021-01-02"))
-          assert(row(8).toString == "2021-01-02 01:01:01.0")
-          assert(row(9).toString == "123.45")
-          assert(row(10) == 43L)
-          assert(row(11) == 5)
-          assert(row(12) == 6)
-        }
-      }
-    }
-
-    // This represents one test configuration over a data source.
-    case class TestCase(
-        dataSource: String,
-        configs: Seq[Config])
-    // Run the test several times using each configuration.
-    Seq(
-      TestCase(
-        dataSource = "csv",
-        Seq(
-          Config(
-            None),
-          Config(
-            Some(SQLConf.CSV_PARSER_COLUMN_PRUNING.key -> "false")))),
-      TestCase(
-        dataSource = "json",
-        Seq(
-          Config(
-            None),
-          Config(
-            Some(SQLConf.JSON_GENERATOR_IGNORE_NULL_FIELDS.key -> "false")))),
-      TestCase(
-        dataSource = "orc",
-        Seq(
-          Config(
-            None),
-          Config(
-            Some(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> "false")))),
-      TestCase(
-        dataSource = "parquet",
-        Seq(
-          Config(
-            None),
-          Config(
-            Some(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false"))))
-    ).foreach { testCase: TestCase =>
-      testCase.configs.foreach { config: Config =>
-        // Run the test twice, once using SQL for the INSERT operations and again using DataFrames.
-        for (useDataFrames <- Seq(false, true)) {
-          config.sqlConf.map { kv: (String, String) =>
-            withSQLConf(kv) {
-              // Run the test with the pair of custom SQLConf values.
-              runTest(testCase.dataSource, config.copy(useDataFrames = useDataFrames))
-            }
-          }.getOrElse {
-            // Run the test with default settings.
-            runTest(testCase.dataSource, config.copy(useDataFrames = useDataFrames))
-          }
-        }
-      }
-    }
-  }
-
-  test("SPARK-39985 Enable implicit DEFAULT column values in inserts from DataFrames") {
-    // Negative test: explicit column "default" references are not supported in write operations
-    // from DataFrames: since the operators are resolved one-by-one, any .select referring to
-    // "default" generates a "column not found" error before any following .insertInto.
-    withTable("t") {
-      sql(s"create table t(a string, i int default 42) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          Seq("xyz").toDF.select("value", "default").write.insertInto("t")
-        },
-        errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-        parameters = Map("objectName" -> "`default`", "proposal" -> "`value`"))
-    }
-  }
-
   test("SPARK-40001 JSON DEFAULT columns = JSON_GENERATOR_WRITE_NULL_IF_WITH_DEFAULT_VALUE off") {

Review Comment:
   JSON, Array, Struct, Map default value work not fine on DS V2. Fix it later.
   
   `SPARK-39359 Restrict DEFAULT columns to allowlist of supported data source types` on DS V2 not throw error, will be fixed later too.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -997,669 +997,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     }
   }
 
-  test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") {
-    // When the INSERT INTO statement provides fewer values than expected, NULL values are appended
-    // in their place.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet")
-      sql("insert into t(i) values(true)")
-      checkAnswer(spark.table("t"), Row(true, null))
-    }
-    // The default value for the DEFAULT keyword is the NULL literal.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet")
-      sql("insert into t values(true, default)")
-      checkAnswer(spark.table("t"), Row(true, null))
-    }
-    // There is a complex expression in the default value.
-    withTable("t") {
-      sql("create table t(i boolean, s string default concat('abc', 'def')) using parquet")
-      sql("insert into t values(true, default)")
-      checkAnswer(spark.table("t"), Row(true, "abcdef"))
-    }
-    // The default value parses correctly and the provided value type is different but coercible.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t(i) values(false)")
-      checkAnswer(spark.table("t"), Row(false, 42L))
-    }
-    // There are two trailing default values referenced implicitly by the INSERT INTO statement.
-    withTable("t") {
-      sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet")
-      sql("insert into t(i) values(1)")
-      checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i)))
-    }
-    withTable("t") {
-      sql("create table t(i int, s bigint default 42, x bigint) using parquet")
-      sql("insert into t(i) values(1)")
-      checkAnswer(spark.table("t"), Row(1, 42L, null))
-    }
-    // The table has a partitioning column and a default value is injected.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)")
-      sql("insert into t partition(i='true') values(5, default)")
-      checkAnswer(spark.table("t"), Row(5, 42, true))
-    }
-    // The table has a partitioning column and a default value is added per an explicit reference.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet partitioned by (i)")
-      sql("insert into t partition(i='true') (s) values(default)")
-      checkAnswer(spark.table("t"), Row(42L, true))
-    }
-    // The default value parses correctly as a constant but non-literal expression.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 41 + 1) using parquet")
-      sql("insert into t values(false, default)")
-      checkAnswer(spark.table("t"), Row(false, 42L))
-    }
-    // Explicit defaults may appear in different positions within the inline table provided as input
-    // to the INSERT INTO statement.
-    withTable("t") {
-      sql("create table t(i boolean default false, s bigint default 42) using parquet")
-      sql("insert into t(i, s) values(false, default), (default, 42)")
-      checkAnswer(spark.table("t"), Seq(Row(false, 42L), Row(false, 42L)))
-    }
-    // There is an explicit default value provided in the INSERT INTO statement in the VALUES,
-    // with an alias over the VALUES.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t select * from values (false, default) as tab(col, other)")
-      checkAnswer(spark.table("t"), Row(false, 42L))
-    }
-    // The explicit default value arrives first before the other value.
-    withTable("t") {
-      sql("create table t(i boolean default false, s bigint) using parquet")
-      sql("insert into t values (default, 43)")
-      checkAnswer(spark.table("t"), Row(false, 43L))
-    }
-    // The 'create table' statement provides the default parameter first.
-    withTable("t") {
-      sql("create table t(i boolean default false, s bigint) using parquet")
-      sql("insert into t values (default, 43)")
-      checkAnswer(spark.table("t"), Row(false, 43L))
-    }
-    // The explicit default value is provided in the wrong order (first instead of second), but
-    // this is OK because the provided default value evaluates to literal NULL.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t values (default, 43)")
-      checkAnswer(spark.table("t"), Row(null, 43L))
-    }
-    // There is an explicit default value provided in the INSERT INTO statement as a SELECT.
-    // This is supported.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t select false, default")
-      checkAnswer(spark.table("t"), Row(false, 42L))
-    }
-    // There is a complex query plan in the SELECT query in the INSERT INTO statement.
-    withTable("t") {
-      sql("create table t(i boolean default false, s bigint default 42) using parquet")
-      sql("insert into t select col, count(*) from values (default, default) " +
-        "as tab(col, other) group by 1")
-      checkAnswer(spark.table("t"), Row(false, 1))
-    }
-    // The explicit default reference resolves successfully with nested table subqueries.
-    withTable("t") {
-      sql("create table t(i boolean default false, s bigint) using parquet")
-      sql("insert into t select * from (select * from values(default, 42))")
-      checkAnswer(spark.table("t"), Row(false, 42L))
-    }
-    // There are three column types exercising various combinations of implicit and explicit
-    // default column value references in the 'insert into' statements. Note these tests depend on
-    // enabling the configuration to use NULLs for missing DEFAULT column values.
-    for (useDataFrames <- Seq(false, true)) {
-      withTable("t1", "t2") {
-        sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet")
-        if (useDataFrames) {
-          Seq((1, 42, 43)).toDF.write.insertInto("t1")
-          Seq((2, 42, 43)).toDF.write.insertInto("t1")
-          Seq((3, 42, 43)).toDF.write.insertInto("t1")
-          Seq((4, 44, 43)).toDF.write.insertInto("t1")
-          Seq((5, 44, 43)).toDF.write.insertInto("t1")
-        } else {
-          sql("insert into t1(j) values(1)")
-          sql("insert into t1(j, s) values(2, default)")
-          sql("insert into t1(j, s, x) values(3, default, default)")
-          sql("insert into t1(j, s) values(4, 44)")
-          sql("insert into t1(j, s, x) values(5, 44, 45)")
-        }
-        sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet")
-        if (useDataFrames) {
-          spark.table("t1").where("j = 1").write.insertInto("t2")
-          spark.table("t1").where("j = 2").write.insertInto("t2")
-          spark.table("t1").where("j = 3").write.insertInto("t2")
-          spark.table("t1").where("j = 4").write.insertInto("t2")
-          spark.table("t1").where("j = 5").write.insertInto("t2")
-        } else {
-          sql("insert into t2(j) select j from t1 where j = 1")
-          sql("insert into t2(j, s) select j, default from t1 where j = 2")
-          sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3")
-          sql("insert into t2(j, s) select j, s from t1 where j = 4")
-          sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5")
-        }
-        checkAnswer(
-          spark.table("t2"),
-          Row(1, 42L, 43L) ::
-          Row(2, 42L, 43L) ::
-          Row(3, 42L, 43L) ::
-          Row(4, 44L, 43L) ::
-          Row(5, 44L, 43L) :: Nil)
-      }
-    }
-  }
-
-  test("SPARK-38336 INSERT INTO statements with tables with default columns: negative tests") {
-    // The default value fails to analyze.
-    withTable("t") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("create table t(i boolean, s bigint default badvalue) using parquet")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
-        parameters = Map(
-          "statement" -> "CREATE TABLE",
-          "colName" -> "`s`",
-          "defaultValue" -> "badvalue"))
-    }
-    // The default value analyzes to a table not in the catalog.
-    withTable("t") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " +
-            "using parquet")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
-        parameters = Map(
-          "statement" -> "CREATE TABLE",
-          "colName" -> "`s`",
-          "defaultValue" -> "(select min(x) from badtable)"))
-    }
-    // The default value parses but refers to a table from the catalog.
-    withTable("t", "other") {
-      sql("create table other(x string) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("create table t(i boolean, s bigint default (select min(x) from other)) " +
-            "using parquet")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
-        parameters = Map(
-          "statement" -> "CREATE TABLE",
-          "colName" -> "`s`",
-          "defaultValue" -> "(select min(x) from other)"))
-    }
-    // The default value has an explicit alias. It fails to evaluate when inlined into the VALUES
-    // list at the INSERT INTO time.
-    withTable("t") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("create table t(i boolean default (select false as alias), s bigint) using parquet")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
-        parameters = Map(
-          "statement" -> "CREATE TABLE",
-          "colName" -> "`i`",
-          "defaultValue" -> "(select false as alias)"))
-    }
-    // Explicit default values may not participate in complex expressions in the VALUES list.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("insert into t values(false, default + 1)")
-        },
-        errorClass = "DEFAULT_PLACEMENT_INVALID",
-        parameters = Map.empty
-      )
-    }
-    // Explicit default values may not participate in complex expressions in the SELECT query.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("insert into t select false, default + 1")
-        },
-        errorClass = "DEFAULT_PLACEMENT_INVALID",
-        parameters = Map.empty
-      )
-    }
-    // Explicit default values have a reasonable error path if the table is not found.
-    withTable("t") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("insert into t values(false, default)")
-        },
-        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
-        parameters = Map("relationName" -> "`t`"),
-        context = ExpectedContext("t", 12, 12)
-      )
-    }
-    // The default value parses but the type is not coercible.
-    withTable("t") {
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("create table t(i boolean, s bigint default false) using parquet")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE",
-        parameters = Map(
-          "statement" -> "CREATE TABLE",
-          "colName" -> "`s`",
-          "expectedType" -> "\"BIGINT\"",
-          "defaultValue" -> "false",
-          "actualType" -> "\"BOOLEAN\""))
-    }
-    // The number of columns in the INSERT INTO statement is greater than the number of columns in
-    // the table.
-    withTable("t") {
-      sql("create table num_data(id int, val decimal(38,10)) using parquet")
-      sql("create table t(id1 int, int2 int, result decimal(38,10)) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " +
-            "from num_data t1, num_data t2")
-        },
-        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
-        parameters = Map(
-          "tableName" -> "`spark_catalog`.`default`.`t`",
-          "tableColumns" -> "`id1`, `int2`, `result`",
-          "dataColumns" -> "`id`, `id`, `val`, `val`, `(val * val)`"))
-    }
-    // The default value is disabled per configuration.
-    withTable("t") {
-      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
-        checkError(
-          exception = intercept[ParseException] {
-            sql("create table t(i boolean, s bigint default 42L) using parquet")
-          },
-          errorClass = "UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION",
-          parameters = Map.empty,
-          context = ExpectedContext("s bigint default 42L", 26, 45)
-        )
-      }
-    }
-    // The table has a partitioning column with a default value; this is not allowed.
-    withTable("t") {
-      sql("create table t(i boolean default true, s bigint, q int default 42) " +
-        "using parquet partitioned by (i)")
-      checkError(
-        exception = intercept[ParseException] {
-          sql("insert into t partition(i=default) values(5, default)")
-        },
-        errorClass = "REF_DEFAULT_VALUE_IS_NOT_ALLOWED_IN_PARTITION",
-        parameters = Map.empty,
-        context = ExpectedContext(
-          fragment = "partition(i=default)",
-          start = 14,
-          stop = 33))
-    }
-    // The configuration option to append missing NULL values to the end of the INSERT INTO
-    // statement is not enabled.
-    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
-      withTable("t") {
-        sql("create table t(i boolean, s bigint) using parquet")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t values(true)")
-          },
-          errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "tableColumns" -> "`i`, `s`",
-            "dataColumns" -> "`col1`"))
-      }
-    }
-  }
-
-  test("SPARK-38795 INSERT INTO with user specified columns and defaults: positive tests") {
-    Seq(
-      "insert into t (i, s) values (true, default)",
-      "insert into t (s, i) values (default, true)",
-      "insert into t (i) values (true)",
-      "insert into t (i) values (default)",
-      "insert into t (s) values (default)",
-      "insert into t (s) select default from (select 1)",
-      "insert into t (i) select true from (select 1)"
-    ).foreach { insert =>
-      withTable("t") {
-        sql("create table t(i boolean default true, s bigint default 42) using parquet")
-        sql(insert)
-        checkAnswer(spark.table("t"), Row(true, 42L))
-      }
-    }
-    // The table is partitioned and we insert default values with explicit column names.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 4, q int default 42) using parquet " +
-        "partitioned by (i)")
-      sql("insert into t partition(i='true') (s) values(5)")
-      sql("insert into t partition(i='false') (q) select 43")
-      sql("insert into t partition(i='false') (q) select default")
-      checkAnswer(spark.table("t"),
-        Seq(Row(5, 42, true),
-            Row(4, 43, false),
-            Row(4, 42, false)))
-    }
-    // If no explicit DEFAULT value is available when the INSERT INTO statement provides fewer
-    // values than expected, NULL values are appended in their place.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet")
-      sql("insert into t (i) values (true)")
-      checkAnswer(spark.table("t"), Row(true, null))
-    }
-    withTable("t") {
-      sql("create table t(i boolean default true, s bigint) using parquet")
-      sql("insert into t (i) values (default)")
-      checkAnswer(spark.table("t"), Row(true, null))
-    }
-    withTable("t") {
-      sql("create table t(i boolean, s bigint default 42) using parquet")
-      sql("insert into t (s) values (default)")
-      checkAnswer(spark.table("t"), Row(null, 42L))
-    }
-    withTable("t") {
-      sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-      sql("insert into t partition(i='true') (s) values(5)")
-      sql("insert into t partition(i='false') (q) select 43")
-      sql("insert into t partition(i='false') (q) select default")
-      checkAnswer(spark.table("t"),
-        Seq(Row(5, null, true),
-          Row(null, 43, false),
-          Row(null, null, false)))
-    }
-  }
-
-  test("SPARK- 38795 INSERT INTO with user specified columns and defaults: negative tests") {
-    // The missing columns in these INSERT INTO commands do not have explicit default values.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint, q int default 43) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("insert into t (i, q) select true from (select 1)")
-        },
-        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
-        parameters = Map(
-          "tableName" -> "`spark_catalog`.`default`.`t`",
-          "tableColumns" -> "`i`, `q`",
-          "dataColumns" -> "`true`"))
-    }
-    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
-    // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
-    // values than expected, the INSERT INTO command fails to execute.
-    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
-      withTable("t") {
-        sql("create table t(i boolean, s bigint) using parquet")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t (i) values (true)")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`s`"))
-      }
-      withTable("t") {
-        sql("create table t(i boolean default true, s bigint) using parquet")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t (i) values (default)")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`s`"))
-      }
-      withTable("t") {
-        sql("create table t(i boolean, s bigint default 42) using parquet")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t (s) values (default)")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`i`"))
-      }
-      withTable("t") {
-        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t partition(i='true') (s) values(5)")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`q`"))
-      }
-      withTable("t") {
-        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t partition(i='false') (q) select 43")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`s`"))
-      }
-      withTable("t") {
-        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
-        checkError(
-          exception = intercept[AnalysisException] {
-            sql("insert into t partition(i='false') (q) select default")
-          },
-          errorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
-          parameters = Map(
-            "tableName" -> "`spark_catalog`.`default`.`t`",
-            "colName" -> "`s`"))
-      }
-    }
-    // When the CASE_SENSITIVE configuration is enabled, then using different cases for the required
-    // and provided column names results in an analysis error.
-    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
-      withTable("t") {
-        sql("create table t(i boolean default true, s bigint default 42) using parquet")
-        checkError(
-          exception =
-            intercept[AnalysisException](sql("insert into t (I) select true from (select 1)")),
-          errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
-          sqlState = None,
-          parameters = Map("objectName" -> "`I`", "proposal" -> "`i`, `s`"),
-          context = ExpectedContext(
-            fragment = "insert into t (I)", start = 0, stop = 16))
-      }
-    }
-  }
-
-  test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests") {
-    // There is a complex expression in the default value.
-    val createTableBooleanCol = "create table t(i boolean) using parquet"
-    val createTableIntCol = "create table t(i int) using parquet"
-    withTable("t") {
-      sql(createTableBooleanCol)
-      sql("alter table t add column s string default concat('abc', 'def')")
-      sql("insert into t values(true, default)")
-      checkAnswer(spark.table("t"), Row(true, "abcdef"))
-    }
-    // There are two trailing default values referenced implicitly by the INSERT INTO statement.
-    withTable("t") {
-      sql(createTableIntCol)
-      sql("alter table t add column s bigint default 42")
-      sql("alter table t add column x bigint default 43")
-      sql("insert into t(i) values(1)")
-      checkAnswer(spark.table("t"), Row(1, 42, 43))
-    }
-    // There are two trailing default values referenced implicitly by the INSERT INTO statement.
-    withTable("t") {
-      sql(createTableIntCol)
-      sql("alter table t add columns s bigint default 42, x bigint default 43")
-      sql("insert into t(i) values(1)")
-      checkAnswer(spark.table("t"), Row(1, 42, 43))
-    }
-    withTable("t") {
-      sql(createTableIntCol)
-      sql("alter table t add column s bigint default 42")
-      sql("alter table t add column x bigint")
-      sql("insert into t(i) values(1)")
-      checkAnswer(spark.table("t"), Row(1, 42, null))
-    }
-    // The table has a partitioning column and a default value is injected.
-    withTable("t") {
-      sql("create table t(i boolean, s bigint) using parquet partitioned by (i)")
-      sql("alter table t add column q int default 42")
-      sql("insert into t partition(i='true') values(5, default)")
-      checkAnswer(spark.table("t"), Row(5, 42, true))
-    }
-    // The default value parses correctly as a constant but non-literal expression.
-    withTable("t") {
-      sql(createTableBooleanCol)
-      sql("alter table t add column s bigint default 41 + 1")
-      sql("insert into t(i) values(default)")
-      checkAnswer(spark.table("t"), Row(null, 42))
-    }
-    // Explicit defaults may appear in different positions within the inline table provided as input
-    // to the INSERT INTO statement.
-    withTable("t") {
-      sql("create table t(i boolean default false) using parquet")
-      sql("alter table t add column s bigint default 42")
-      sql("insert into t values(false, default), (default, 42)")
-      checkAnswer(spark.table("t"), Seq(Row(false, 42), Row(false, 42)))
-    }
-    // There is an explicit default value provided in the INSERT INTO statement in the VALUES,
-    // with an alias over the VALUES.
-    withTable("t") {
-      sql(createTableBooleanCol)
-      sql("alter table t add column s bigint default 42")
-      sql("insert into t select * from values (false, default) as tab(col, other)")
-      checkAnswer(spark.table("t"), Row(false, 42))
-    }
-    // The explicit default value is provided in the wrong order (first instead of second), but
-    // this is OK because the provided default value evaluates to literal NULL.
-    withTable("t") {
-      sql(createTableBooleanCol)
-      sql("alter table t add column s bigint default 42")
-      sql("insert into t values (default, 43)")
-      checkAnswer(spark.table("t"), Row(null, 43))
-    }
-    // There is an explicit default value provided in the INSERT INTO statement as a SELECT.
-    // This is supported.
-    withTable("t") {
-      sql(createTableBooleanCol)
-      sql("alter table t add column s bigint default 42")
-      sql("insert into t select false, default")
-      checkAnswer(spark.table("t"), Row(false, 42))
-    }
-    // There is a complex query plan in the SELECT query in the INSERT INTO statement.
-    withTable("t") {
-      sql("create table t(i boolean default false) using parquet")
-      sql("alter table t add column s bigint default 42")
-      sql("insert into t select col, count(*) from values (default, default) " +
-        "as tab(col, other) group by 1")
-      checkAnswer(spark.table("t"), Row(false, 1))
-    }
-    // There are three column types exercising various combinations of implicit and explicit
-    // default column value references in the 'insert into' statements. Note these tests depend on
-    // enabling the configuration to use NULLs for missing DEFAULT column values.
-    withTable("t1", "t2") {
-      sql("create table t1(j int) using parquet")
-      sql("alter table t1 add column s bigint default 42")
-      sql("alter table t1 add column x bigint default 43")
-      sql("insert into t1(j) values(1)")
-      sql("insert into t1(j, s) values(2, default)")
-      sql("insert into t1(j, s, x) values(3, default, default)")
-      sql("insert into t1(j, s) values(4, 44)")
-      sql("insert into t1(j, s, x) values(5, 44, 45)")
-      sql("create table t2(j int) using parquet")
-      sql("alter table t2 add columns s bigint default 42, x bigint default 43")
-      sql("insert into t2(j) select j from t1 where j = 1")
-      sql("insert into t2(j, s) select j, default from t1 where j = 2")
-      sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3")
-      sql("insert into t2(j, s) select j, s from t1 where j = 4")
-      sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5")
-      checkAnswer(
-        spark.table("t2"),
-        Row(1, 42L, 43L) ::
-        Row(2, 42L, 43L) ::
-        Row(3, 42L, 43L) ::
-        Row(4, 44L, 43L) ::
-        Row(5, 44L, 43L) :: Nil)
-    }
-  }
-
-  test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Negative tests") {
-    // The default value fails to analyze.
-    withTable("t") {
-      sql("create table t(i boolean) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("alter table t add column s bigint default badvalue")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
-        parameters = Map(
-          "statement" -> "ALTER TABLE ADD COLUMNS",
-          "colName" -> "`s`",
-          "defaultValue" -> "badvalue"))
-    }
-    // The default value analyzes to a table not in the catalog.
-    withTable("t") {
-      sql("create table t(i boolean) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("alter table t add column s bigint default (select min(x) from badtable)")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
-        parameters = Map(
-          "statement" -> "ALTER TABLE ADD COLUMNS",
-          "colName" -> "`s`",
-          "defaultValue" -> "(select min(x) from badtable)"))
-    }
-    // The default value parses but refers to a table from the catalog.
-    withTable("t", "other") {
-      sql("create table other(x string) using parquet")
-      sql("create table t(i boolean) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("alter table t add column s bigint default (select min(x) from other)")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
-        parameters = Map(
-          "statement" -> "ALTER TABLE ADD COLUMNS",
-          "colName" -> "`s`",
-          "defaultValue" -> "(select min(x) from other)"))
-    }
-    // The default value parses but the type is not coercible.
-    withTable("t") {
-      sql("create table t(i boolean) using parquet")
-      checkError(
-        exception = intercept[AnalysisException] {
-          sql("alter table t add column s bigint default false")
-        },
-        errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE",
-        parameters = Map(
-          "statement" -> "ALTER TABLE ADD COLUMNS",
-          "colName" -> "`s`",
-          "expectedType" -> "\"BIGINT\"",
-          "defaultValue" -> "false",
-          "actualType" -> "\"BOOLEAN\""))
-    }
-    // The default value is disabled per configuration.
-    withTable("t") {
-      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
-        sql("create table t(i boolean) using parquet")
-        checkError(
-          exception = intercept[ParseException] {
-            sql("alter table t add column s bigint default 42L")
-          },
-          errorClass = "UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION",
-          parameters = Map.empty,
-          context = ExpectedContext(
-            fragment = "s bigint default 42L",
-            start = 25,
-            stop = 44)
-        )
-      }
-    }
-  }
-
   test("SPARK-38838 INSERT INTO with defaults set by ALTER TABLE ALTER COLUMN: positive tests") {
     withTable("t") {

Review Comment:
   We didn't move `ALTER TABLE ALTER COLUMN` case because the negative tests of it behavior not right on datasource v2. Alter invaild default value will not throw error now. After we fix it then move it into `SQLInsertTestSuite`.



##########
sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala:
##########
@@ -341,6 +353,990 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     }
   }
 
+  test("SPARK-38336 INSERT INTO statements with tables with default columns: positive tests") {
+    // When the INSERT INTO statement provides fewer values than expected, NULL values are appended
+    // in their place.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint) using parquet")
+      sql("insert into t(i) values(true)")
+      checkAnswer(spark.table("t"), Row(true, null))
+    }
+    // The default value for the DEFAULT keyword is the NULL literal.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint) using parquet")
+      sql("insert into t values(true, default)")
+      checkAnswer(spark.table("t"), Row(true, null))
+    }
+    // There is a complex expression in the default value.
+    withTable("t") {
+      sql("create table t(i boolean, s string default concat('abc', 'def')) using parquet")
+      sql("insert into t values(true, default)")
+      checkAnswer(spark.table("t"), Row(true, "abcdef"))
+    }
+    // The default value parses correctly and the provided value type is different but coercible.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t(i) values(false)")
+      checkAnswer(spark.table("t"), Row(false, 42L))
+    }
+    // There are two trailing default values referenced implicitly by the INSERT INTO statement.
+    withTable("t") {
+      sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet")
+      sql("insert into t(i) values(1)")
+      checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i int, s bigint default 42, x bigint) using parquet")
+      sql("insert into t(i) values(1)")
+      checkAnswer(spark.table("t"), Row(1, 42L, null))
+    }
+    // The table has a partitioning column and a default value is injected.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint, q int default 42) using parquet partitioned by (i)")
+      sql("insert into t partition(i='true') values(5, default)")
+      checkV1AndV2Answer(spark.table("t"), Row(5, 42, true), Row(true, 5, 42))
+    }
+    // The table has a partitioning column and a default value is added per an explicit reference.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet partitioned by (i)")
+      sql("insert into t partition(i='true') (s) values(default)")
+      checkV1AndV2Answer(spark.table("t"), Row(42L, true), Row(true, 42L))
+    }
+    // The default value parses correctly as a constant but non-literal expression.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 41 + 1) using parquet")
+      sql("insert into t values(false, default)")
+      checkAnswer(spark.table("t"), Row(false, 42L))
+    }
+    // Explicit defaults may appear in different positions within the inline table provided as input
+    // to the INSERT INTO statement.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint default 42) using parquet")
+      sql("insert into t(i, s) values(false, default), (default, 42)")
+      checkAnswer(spark.table("t"), Seq(Row(false, 42L), Row(false, 42L)))
+    }
+    // There is an explicit default value provided in the INSERT INTO statement in the VALUES,
+    // with an alias over the VALUES.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t select * from values (false, default) as tab(col, other)")
+      checkAnswer(spark.table("t"), Row(false, 42L))
+    }
+    // The explicit default value arrives first before the other value.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(spark.table("t"), Row(false, 43L))
+    }
+    // The 'create table' statement provides the default parameter first.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(spark.table("t"), Row(false, 43L))
+    }
+    // The explicit default value is provided in the wrong order (first instead of second), but
+    // this is OK because the provided default value evaluates to literal NULL.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t values (default, 43)")
+      checkAnswer(spark.table("t"), Row(null, 43L))
+    }
+    // There is an explicit default value provided in the INSERT INTO statement as a SELECT.
+    // This is supported.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t select false, default")
+      checkAnswer(spark.table("t"), Row(false, 42L))
+    }
+    // There is a complex query plan in the SELECT query in the INSERT INTO statement.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint default 42) using parquet")
+      sql("insert into t select col, count(*) from values (default, default) " +
+        "as tab(col, other) group by 1")
+      checkAnswer(spark.table("t"), Row(false, 1))
+    }
+    // The explicit default reference resolves successfully with nested table subqueries.
+    withTable("t") {
+      sql("create table t(i boolean default false, s bigint) using parquet")
+      sql("insert into t select * from (select * from values(default, 42))")
+      checkAnswer(spark.table("t"), Row(false, 42L))
+    }
+    // There are three column types exercising various combinations of implicit and explicit
+    // default column value references in the 'insert into' statements. Note these tests depend on
+    // enabling the configuration to use NULLs for missing DEFAULT column values.
+    for (useDataFrames <- Seq(false, true)) {
+      withTable("t1", "t2") {
+        sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet")
+        if (useDataFrames) {
+          Seq((1, 42, 43)).toDF.write.insertInto("t1")
+          Seq((2, 42, 43)).toDF.write.insertInto("t1")
+          Seq((3, 42, 43)).toDF.write.insertInto("t1")
+          Seq((4, 44, 43)).toDF.write.insertInto("t1")
+          Seq((5, 44, 43)).toDF.write.insertInto("t1")
+        } else {
+          sql("insert into t1(j) values(1)")
+          sql("insert into t1(j, s) values(2, default)")
+          sql("insert into t1(j, s, x) values(3, default, default)")
+          sql("insert into t1(j, s) values(4, 44)")
+          sql("insert into t1(j, s, x) values(5, 44, 45)")
+        }
+        sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet")
+        if (useDataFrames) {
+          spark.table("t1").where("j = 1").write.insertInto("t2")
+          spark.table("t1").where("j = 2").write.insertInto("t2")
+          spark.table("t1").where("j = 3").write.insertInto("t2")
+          spark.table("t1").where("j = 4").write.insertInto("t2")
+          spark.table("t1").where("j = 5").write.insertInto("t2")
+        } else {
+          sql("insert into t2(j) select j from t1 where j = 1")
+          sql("insert into t2(j, s) select j, default from t1 where j = 2")
+          sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3")
+          sql("insert into t2(j, s) select j, s from t1 where j = 4")
+          sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5")
+        }
+        checkAnswer(
+          spark.table("t2"),
+          Row(1, 42L, 43L) ::
+            Row(2, 42L, 43L) ::
+            Row(3, 42L, 43L) ::
+            Row(4, 44L, 43L) ::
+            Row(5, 44L, 43L) :: Nil)
+      }
+    }
+  }
+
+  test("SPARK-38336 INSERT INTO statements with tables with default columns: negative tests") {
+    // The default value fails to analyze.
+    withTable("t") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default badvalue) using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "badvalue"))
+    }
+    // The default value analyzes to a table not in the catalog.
+    withTable("t") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default (select min(x) from badtable)) " +
+            "using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "(select min(x) from badtable)"))
+    }
+    // The default value parses but refers to a table from the catalog.
+    withTable("t", "other") {
+      sql("create table other(x string) using parquet")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default (select min(x) from other)) " +
+            "using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "defaultValue" -> "(select min(x) from other)"))
+    }
+    // The default value has an explicit alias. It fails to evaluate when inlined into the VALUES
+    // list at the INSERT INTO time.
+    withTable("t") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean default (select false as alias), s bigint) using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.SUBQUERY_EXPRESSION",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`i`",
+          "defaultValue" -> "(select false as alias)"))
+    }
+    // Explicit default values may not participate in complex expressions in the VALUES list.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values(false, default + 1)")
+        },
+        errorClass = "DEFAULT_PLACEMENT_INVALID",
+        parameters = Map.empty
+      )
+    }
+    // Explicit default values may not participate in complex expressions in the SELECT query.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t select false, default + 1")
+        },
+        errorClass = "DEFAULT_PLACEMENT_INVALID",
+        parameters = Map.empty
+      )
+    }
+    // Explicit default values have a reasonable error path if the table is not found.
+    withTable("t") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values(false, default)")
+        },
+        errorClass = "TABLE_OR_VIEW_NOT_FOUND",
+        parameters = Map("relationName" -> "`t`"),
+        context = ExpectedContext("t", 12, 12)
+      )
+    }
+    // The default value parses but the type is not coercible.
+    withTable("t") {
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("create table t(i boolean, s bigint default false) using parquet")
+        },
+        errorClass = "INVALID_DEFAULT_VALUE.DATA_TYPE",
+        parameters = Map(
+          "statement" -> "CREATE TABLE",
+          "colName" -> "`s`",
+          "expectedType" -> "\"BIGINT\"",
+          "defaultValue" -> "false",
+          "actualType" -> "\"BOOLEAN\""))
+    }
+    // The number of columns in the INSERT INTO statement is greater than the number of columns in
+    // the table.
+    withTable("t") {
+      sql("create table num_data(id int, val decimal(38,10)) using parquet")
+      sql("create table t(id1 int, int2 int, result decimal(38,10)) using parquet")
+      checkV1AndV2Error(
+        exception = intercept[AnalysisException] {
+          sql("insert into t select t1.id, t2.id, t1.val, t2.val, t1.val * t2.val " +
+            "from num_data t1, num_data t2")
+        },
+        v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+        v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.TOO_MANY_DATA_COLUMNS",
+        v1Parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "tableColumns" -> "`id1`, `int2`, `result`",
+          "dataColumns" -> "`id`, `id`, `val`, `val`, `(val * val)`"),
+        v2Parameters = Map(
+          "tableName" -> "`testcat`.`t`",
+          "tableColumns" -> "`id1`, `int2`, `result`",
+          "dataColumns" -> "`id`, `id`, `val`, `val`, `(val * val)`"))
+    }
+    // The default value is disabled per configuration.
+    withTable("t") {
+      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+        checkError(
+          exception = intercept[ParseException] {
+            sql("create table t(i boolean, s bigint default 42L) using parquet")
+          },
+          errorClass = "UNSUPPORTED_DEFAULT_VALUE.WITH_SUGGESTION",
+          parameters = Map.empty,
+          context = ExpectedContext("s bigint default 42L", 26, 45)
+        )
+      }
+    }
+    // The table has a partitioning column with a default value; this is not allowed.
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint, q int default 42) " +
+        "using parquet partitioned by (i)")
+      checkError(
+        exception = intercept[ParseException] {
+          sql("insert into t partition(i=default) values(5, default)")
+        },
+        errorClass = "REF_DEFAULT_VALUE_IS_NOT_ALLOWED_IN_PARTITION",
+        parameters = Map.empty,
+        context = ExpectedContext(
+          fragment = "partition(i=default)",
+          start = 14,
+          stop = 33))
+    }
+    // The configuration option to append missing NULL values to the end of the INSERT INTO
+    // statement is not enabled.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t values(true)")
+          },
+          v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+          v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "tableColumns" -> "`i`, `s`",
+            "dataColumns" -> "`col1`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "tableColumns" -> "`i`, `s`",
+            "dataColumns" -> "`col1`"))
+      }
+    }
+  }
+
+  test("SPARK-38795 INSERT INTO with user specified columns and defaults: positive tests") {
+    Seq(
+      "insert into t (i, s) values (true, default)",
+      "insert into t (s, i) values (default, true)",
+      "insert into t (i) values (true)",
+      "insert into t (i) values (default)",
+      "insert into t (s) values (default)",
+      "insert into t (s) select default from (select 1)",
+      "insert into t (i) select true from (select 1)"
+    ).foreach { insert =>
+      withTable("t") {
+        sql("create table t(i boolean default true, s bigint default 42) using parquet")
+        sql(insert)
+        checkAnswer(spark.table("t"), Row(true, 42L))
+      }
+    }
+    // The table is partitioned and we insert default values with explicit column names.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 4, q int default 42) using parquet " +
+        "partitioned by (i)")
+      sql("insert into t partition(i='true') (s) values(5)")
+      sql("insert into t partition(i='false') (q) select 43")
+      sql("insert into t partition(i='false') (q) select default")
+      checkV1AndV2Answer(df = spark.table("t"),
+        v1ExpectedAnswer = Seq(Row(5, 42, true),
+          Row(4, 43, false),
+          Row(4, 42, false)),
+        v2ExpectedAnswer = Seq(Row(true, 5, 42),
+          Row(false, 4, 43),
+          Row(false, 4, 42)))
+    }
+    // If no explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+    // values than expected, NULL values are appended in their place.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint) using parquet")
+      sql("insert into t (i) values (true)")
+      checkAnswer(spark.table("t"), Row(true, null))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint) using parquet")
+      sql("insert into t (i) values (default)")
+      checkAnswer(spark.table("t"), Row(true, null))
+    }
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42) using parquet")
+      sql("insert into t (s) values (default)")
+      checkAnswer(spark.table("t"), Row(null, 42L))
+    }
+    withTable("t") {
+      sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
+      sql("insert into t partition(i='true') (s) values(5)")
+      sql("insert into t partition(i='false') (q) select 43")
+      sql("insert into t partition(i='false') (q) select default")
+      checkV1AndV2Answer(df = spark.table("t"),
+        v1ExpectedAnswer = Seq(Row(5, null, true),
+          Row(null, 43, false),
+          Row(null, null, false)),
+        v2ExpectedAnswer = Seq(Row(true, 5, null),
+          Row(false, null, 43),
+          Row(false, null, null)))
+    }
+  }
+
+  test("SPARK- 38795 INSERT INTO with user specified columns and defaults: negative tests") {
+    // The missing columns in these INSERT INTO commands do not have explicit default values.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint, q int default 43) using parquet")
+      checkV1AndV2Error(
+        exception = intercept[AnalysisException] {
+          sql("insert into t (i, q) select true from (select 1)")
+        },
+        v1ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        v2ErrorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        v1Parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "tableColumns" -> "`i`, `q`",
+          "dataColumns" -> "`true`"),
+        v2Parameters = Map(
+          "tableName" -> "`testcat`.`t`",
+          "tableColumns" -> "`i`, `q`",
+          "dataColumns" -> "`true`"))
+    }
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement provides fewer
+    // values than expected, the INSERT INTO command fails to execute.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "false") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (i) values (true)")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`s`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`s`"))
+      }
+      withTable("t") {
+        sql("create table t(i boolean default true, s bigint) using parquet")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (i) values (default)")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`s`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`s`"))
+      }
+      withTable("t") {
+        sql("create table t(i boolean, s bigint default 42) using parquet")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t (s) values (default)")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`i`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`i`"))
+      }
+      withTable("t") {
+        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='true') (s) values(5)")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`q`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`q`"))
+      }
+      withTable("t") {
+        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='false') (q) select 43")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`s`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`s`"))
+      }
+      withTable("t") {
+        sql("create table t(i boolean, s bigint, q int) using parquet partitioned by (i)")
+        checkV1AndV2Error(
+          exception = intercept[AnalysisException] {
+            sql("insert into t partition(i='false') (q) select default")
+          },
+          v1ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v2ErrorClass = "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA",
+          v1Parameters = Map(
+            "tableName" -> "`spark_catalog`.`default`.`t`",
+            "colName" -> "`s`"),
+          v2Parameters = Map(
+            "tableName" -> "`testcat`.`t`",
+            "colName" -> "`s`"))
+      }
+    }
+    // When the CASE_SENSITIVE configuration is enabled, then using different cases for the required
+    // and provided column names results in an analysis error.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      withTable("t") {
+        sql("create table t(i boolean default true, s bigint default 42) using parquet")
+        checkError(
+          exception =
+            intercept[AnalysisException](sql("insert into t (I) select true from (select 1)")),
+          errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+          sqlState = None,
+          parameters = Map("objectName" -> "`I`", "proposal" -> "`i`, `s`"),
+          context = ExpectedContext(
+            fragment = "insert into t (I)", start = 0, stop = 16))
+      }
+    }
+  }
+
+  test("SPARK-38811 INSERT INTO on columns added with ALTER TABLE ADD COLUMNS: Positive tests") {
+    withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> "parquet, ") {

Review Comment:
   We use a little trick from https://github.com/apache/spark/pull/36771#discussion_r901000165 to pass provider checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org