You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "cloud-fan (via GitHub)" <gi...@apache.org> on 2023/05/22 16:44:05 UTC

[GitHub] [spark] cloud-fan opened a new pull request, #41262: [WIP] refactor default column value resolution

cloud-fan opened a new pull request, #41262:
URL: https://github.com/apache/spark/pull/41262

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204275042


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -36,7 +37,9 @@ object TableOutputResolver {
       expected: Seq[Attribute],
       query: LogicalPlan,
       byName: Boolean,
-      conf: SQLConf): LogicalPlan = {
+      conf: SQLConf,
+      // TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well.

Review Comment:
   This is an existing issue that the default column value doesn't work for v2 inserts. I decided to fix it later as it needs to update quite some v2 tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204064492


##########
core/src/main/resources/error/error-classes.json:
##########
@@ -785,7 +785,18 @@
   },
   "INSERT_COLUMN_ARITY_MISMATCH" : {
     "message" : [
-      "<tableName> requires that the data to be inserted have the same number of columns as the target table: target table has <targetColumns> column(s) but the inserted data has <insertedColumns> column(s), including <staticPartCols> partition column(s) having constant value(s)."
+      "Cannot write to '<tableName>', <reason>:",

Review Comment:
   Unify the errors between v1 and v2 inserts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204308382


##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -142,17 +142,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     )
   }
 
-  test("SELECT clause generating a different number of columns is not allowed.") {

Review Comment:
   it should be allowed. According to the tests the default value resolution is not triggered in some cases before this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204281228


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala:
##########
@@ -164,7 +164,8 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
 
     case i @ InsertIntoStatement(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _) =>
+        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _)
+        if query.resolved =>

Review Comment:
   This is needed due to https://github.com/apache/spark/pull/41262/files#r1204075426 . Now it's possible that table is resolved but the query is not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204314226


##########
sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala:
##########
@@ -1258,13 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
           """INSERT INTO TABLE dp_test PARTITION(dp)
             |SELECT key, value, key % 5 FROM src""".stripMargin)
       },
-      errorClass = "INSERT_COLUMN_ARITY_MISMATCH",
-      sqlState = "21S01",
+      errorClass = "_LEGACY_ERROR_TEMP_1169",

Review Comment:
   It's more accurate to report that the partition column list in INSERT does not match the actual table partition columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1205887470


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keySet.map(normalizeFieldName)
+      // For INSERT with static partitions, such as `INSERT INTO t PARTITION(c=1) SELECT ...`, the
+      // input query schema should match the table schema excluding columns with static
+      // partition values.
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      // Normally, we should match the query schema with the table schema by position. If the n-th
+      // column of the query is the DEFAULT column, we should get the default value expression
+      // defined for the n-th column of the table. However, if the INSERT has a column list, such as
+      // `INSERT INTO t(b, c, a)`, the matching should be by name. For example, the first column of
+      // the query should match the column 'b' of the table.
+      // To simplify the implementation, `resolveColumnDefault` always does by-position match. If
+      // the INSERT has a column list, we reorder the table schema w.r.t. the column list and pass
+      // the reordered schema as the expected schema to `resolveColumnDefault`.
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  /**
+   * Resolves the column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]]. A column is a
+   * "DEFAULT" column if all the following conditions are met:
+   * 1. The expression inside project list or inline table expressions is a single
+   *    [[UnresolvedAttribute]] with name "DEFAULT". This means `SELECT DEFAULT, ...` is valid but
+   *    `SELECT DEFAULT + 1, ...` is not.
+   * 2. The project list or inline table expressions have less elements than the expected schema.
+   *    To find the default value definition, we need to find the matching column for expressions
+   *    inside project list or inline table expressions. This matching is by position and it
+   *    doesn't make sense if we have more expressions than the columns of expected schema.
+   * 3. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+   *    all unary nodes that inherit the output columns from its child.
+   * 4. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+   *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+   */
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)

Review Comment:
   optional: when I wrote the original `ResolveDefaultColumns` rule, I named this variable `insertTableSchemaWithoutPartitionColumns` because I found myself confused frequently when reading the variable name. We could name this `insertTargetTableSchema` to clarify this, or `insertTargetTableSchemaWithoutPartitionColumns` if you don't think that's too verbose.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keySet.map(normalizeFieldName)
+      // For INSERT with static partitions, such as `INSERT INTO t PARTITION(c=1) SELECT ...`, the
+      // input query schema should match the table schema excluding columns with static
+      // partition values.
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      // Normally, we should match the query schema with the table schema by position. If the n-th
+      // column of the query is the DEFAULT column, we should get the default value expression
+      // defined for the n-th column of the table. However, if the INSERT has a column list, such as
+      // `INSERT INTO t(b, c, a)`, the matching should be by name. For example, the first column of
+      // the query should match the column 'b' of the table.
+      // To simplify the implementation, `resolveColumnDefault` always does by-position match. If
+      // the INSERT has a column list, we reorder the table schema w.r.t. the column list and pass
+      // the reordered schema as the expected schema to `resolveColumnDefault`.
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  /**
+   * Resolves the column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]]. A column is a
+   * "DEFAULT" column if all the following conditions are met:
+   * 1. The expression inside project list or inline table expressions is a single
+   *    [[UnresolvedAttribute]] with name "DEFAULT". This means `SELECT DEFAULT, ...` is valid but
+   *    `SELECT DEFAULT + 1, ...` is not.
+   * 2. The project list or inline table expressions have less elements than the expected schema.
+   *    To find the default value definition, we need to find the matching column for expressions
+   *    inside project list or inline table expressions. This matching is by position and it
+   *    doesn't make sense if we have more expressions than the columns of expected schema.
+   * 3. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+   *    all unary nodes that inherit the output columns from its child.
+   * 4. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+   *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+   */
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))

Review Comment:
   optional: It looks like the only purpose of `acceptInlineTable` is setting it to false here in the event of a LIMIT and/or OFFSET and/or ORDER BY on top of a VALUES list. Do you think this check is strictly necessary? If not, we can simplify by removing `acceptInlineTable` as an argument to this function.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1143,38 +1140,34 @@ class PlanResolutionSuite extends AnalysisTest {
 
       checkError(
         exception = intercept[AnalysisException] {
-          parseAndResolve(sql8)
+          parseAndResolve(sql8, checkAnalysis = true)

Review Comment:
   This sounds reasonable, we can leave this test here, but we don't have to exhaustively cover all the cases.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1236,37 +1206,37 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
         sql("create table t(i boolean, s bigint) using parquet")
         assert(intercept[AnalysisException] {
           sql("insert into t (i) values (true)")
-        }.getMessage.contains(addOneColButExpectedTwo))
+        }.getMessage.contains("Cannot find data for output column 's'"))
       }
       withTable("t") {
         sql("create table t(i boolean default true, s bigint) using parquet")
         assert(intercept[AnalysisException] {
           sql("insert into t (i) values (default)")
-        }.getMessage.contains(addOneColButExpectedTwo))
+        }.getMessage.contains("Cannot find data for output column 's'"))

Review Comment:
   can we dedup this expected error message substring into one place, or even better, use `checkError` to assert on the error class?



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -142,17 +142,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     )
   }
 
-  test("SELECT clause generating a different number of columns is not allowed.") {

Review Comment:
   should we keep the test, but change its result to assert that it succeeds? or is this behavior exercised elsewhere in this test file?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)
+            Alias(getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)), u.name)()
+          case (other, _) if containsExplicitDefaultColumn(other) =>
+            throw QueryCompilationErrors
+              .defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+          case (other, _) => other
+        }
+        val newChild = resolveColumnDefault(p.child, expectedQuerySchema, acceptProject = false)
+        val newProj = p.copy(projectList = newProjectList, child = newChild)
+        newProj.copyTagsFrom(p)
+        newProj
+
+      case _: Project | _: Aggregate if acceptInlineTable =>
+        plan.mapChildren(resolveColumnDefault(_, expectedQuerySchema, acceptProject = false))
+
+      case inlineTable: UnresolvedInlineTable if acceptInlineTable &&
+          inlineTable.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          inlineTable.rows.forall(exprs => exprs.length <= expectedQuerySchema.length) =>
+        val newRows = inlineTable.rows.map { exprs =>
+          exprs.zipWithIndex.map {
+            case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+              val field = expectedQuerySchema(i)
+              getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType))

Review Comment:
   optional: should we add a boolean argument to `getDefaultValueExprOrNullLiteral` to switch the behavior between the two modes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204298648


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1116,12 +1116,9 @@ class PlanResolutionSuite extends AnalysisTest {
             // Note that when resolving DEFAULT column references, the analyzer will insert literal
             // NULL values if the corresponding table does not define an explicit default value for
             // that column. This is intended.
-            Assignment(i: AttributeReference,
-              cast1 @ Cast(Literal(null, _), IntegerType, _, EvalMode.ANSI)),
-            Assignment(s: AttributeReference,
-              cast2 @ Cast(Literal(null, _), StringType, _, EvalMode.ANSI))),
-          None) if cast1.getTagValue(Cast.BY_TABLE_INSERTION).isDefined &&
-          cast2.getTagValue(Cast.BY_TABLE_INSERTION).isDefined =>
+            Assignment(i: AttributeReference, Literal(null, IntegerType)),
+            Assignment(s: AttributeReference, Literal(null, StringType))),

Review Comment:
   now we create null literal with the expected data type directly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204311148


##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -863,42 +844,39 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
   }
 
   test("Allow user to insert specified columns into insertable view") {
-    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") {

Review Comment:
   remove this conf setting as it's true by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204075426


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _) =>

Review Comment:
   This change is needed. We want to resolve the table first, then the column "DEFAULT" in the query. This means we can't wait for the query to be resolved before resolving the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1202627437


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1278,53 +1275,10 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
   }
 
   /** Handle INSERT INTO for DSv2 */
-  object ResolveInsertInto extends Rule[LogicalPlan] {
-
-    /** Add a project to use the table column names for INSERT INTO BY NAME */
-    private def createProjectForByNameQuery(i: InsertIntoStatement): LogicalPlan = {

Review Comment:
   the code here is unchanged but just moved to `ResolveInsertionBase`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204307028


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -2218,54 +2208,29 @@ class PlanResolutionSuite extends AnalysisTest {
     }
   }
 
-  test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {
+  test("MERGE INTO TABLE - skip filling missing cols on v2 tables that accept any schema") {
     val sql =
       s"""
-         |MERGE INTO v2TableWithAcceptAnySchemaCapability AS target
+         |MERGE INTO testcat.v2TableWithAcceptAnySchemaCapability AS target
          |USING v2Table AS source
          |ON target.i = source.i
-         |WHEN MATCHED AND (target.s='delete')THEN DELETE
-         |WHEN MATCHED AND (target.s='update') THEN UPDATE SET target.s = source.s
-         |WHEN NOT MATCHED AND (target.s=DEFAULT)
-         |  THEN INSERT (target.i, target.s) values (source.i, source.s)
-         |WHEN NOT MATCHED BY SOURCE AND (target.s='delete') THEN DELETE
-         |WHEN NOT MATCHED BY SOURCE AND (target.s='update') THEN UPDATE SET target.s = target.i

Review Comment:
   simplify the MERGE statement to focus on missing cols.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-43742][SQL] Refactor default column value resolution [spark]

Posted by "yaooqinn (via GitHub)" <gi...@apache.org>.
yaooqinn commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1424806347


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _) =>

Review Comment:
   If we remove the pattern guard in this code, some operations on the "i.query" will fail later on. I create https://github.com/apache/spark/pull/44326 to fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [WIP] refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201711351


##########
sql/core/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumnsSuite.scala:
##########
@@ -129,111 +144,4 @@ class ResolveDefaultColumnsSuite extends QueryTest with SharedSparkSession {
       }
     }
   }
-
-  /**
-   * This is a new relation type that defines the 'customSchemaForInserts' method.
-   * Its implementation drops the last table column as it represents an internal pseudocolumn.
-   */
-  case class TableWithCustomInsertSchema(output: Seq[Attribute], numMetadataColumns: Int)
-    extends Table with SupportsCustomSchemaWrite {
-    override def name: String = "t"
-    override def schema: StructType = StructType.fromAttributes(output)
-    override def capabilities(): java.util.Set[TableCapability] =
-      new java.util.HashSet[TableCapability]()
-    override def customSchemaForInserts: StructType =
-      StructType(schema.fields.dropRight(numMetadataColumns))
-  }
-
-  /** Helper method to generate a DSV2 relation using the above table type. */
-  private def relationWithCustomInsertSchema(
-      output: Seq[AttributeReference], numMetadataColumns: Int): DataSourceV2Relation = {
-    DataSourceV2Relation(
-      TableWithCustomInsertSchema(output, numMetadataColumns),
-      output,
-      catalog = None,
-      identifier = None,
-      options = CaseInsensitiveStringMap.empty)
-  }
-
-  test("SPARK-43313: Add missing default values for MERGE INSERT actions") {

Review Comment:
   MERGE/UPDATE are tested in `Align[Update|Merge]AssignmentsSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204933513


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)
+            Alias(getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)), u.name)()
+          case (other, _) if containsExplicitDefaultColumn(other) =>
+            throw QueryCompilationErrors
+              .defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+          case (other, _) => other
+        }
+        val newChild = resolveColumnDefault(p.child, expectedQuerySchema, acceptProject = false)
+        val newProj = p.copy(projectList = newProjectList, child = newChild)
+        newProj.copyTagsFrom(p)
+        newProj
+
+      case _: Project | _: Aggregate if acceptInlineTable =>
+        plan.mapChildren(resolveColumnDefault(_, expectedQuerySchema, acceptProject = false))
+
+      case inlineTable: UnresolvedInlineTable if acceptInlineTable &&
+          inlineTable.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          inlineTable.rows.forall(exprs => exprs.length <= expectedQuerySchema.length) =>
+        val newRows = inlineTable.rows.map { exprs =>
+          exprs.zipWithIndex.map {
+            case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+              val field = expectedQuerySchema(i)
+              getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType))

Review Comment:
   There is a subtle difference: For missing cols, the default null value is optional (controlled by a flag). For the column "DEFAULT", it's a new feature when we add default value support and we can always use null as the default value if it's not defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204930606


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet

Review Comment:
   `InsertIntoStatement#partitionSpec` is `Map[String, Option[String]]`, and in Scala we can only use `_2` to refer to the map value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #41262: [WIP] refactor default column value resolution

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201438726


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInInsert.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ */
+case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper {

Review Comment:
   `ResolveColumnDefaultInInsert`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInInsert.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ */
+case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper {

Review Comment:
   `ResolveColumnDefaultInInsert`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204275042


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala:
##########
@@ -36,7 +37,9 @@ object TableOutputResolver {
       expected: Seq[Attribute],
       query: LogicalPlan,
       byName: Boolean,
-      conf: SQLConf): LogicalPlan = {
+      conf: SQLConf,
+      // TODO: Only DS v1 writing will set it to true. We should enable in for DS v2 as well.

Review Comment:
   This is an existing issue. I decided to fix it later as it needs to update quite some v2 tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204075426


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _) =>

Review Comment:
   This change is needed. We want to resolve the table first, then resolve the column "DEFAULT" in the query. This means we can't wait for the query to be resolved before resolving the table.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204308382


##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -142,17 +142,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession {
     )
   }
 
-  test("SELECT clause generating a different number of columns is not allowed.") {

Review Comment:
   it should be allowed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [WIP] refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201710210


##########
sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala:
##########
@@ -201,30 +201,6 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils {
     }
   }
 
-  test("insert with column list - missing columns") {

Review Comment:
   Missing columns should not fail and we test it in `ResolveDefaultColumnsSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1205271858


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {

Review Comment:
   I'll add doc for the `resolveColumnDefault` method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204667363


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet

Review Comment:
   this is a bit hard to read, can we split the transformations into different lines with vals, and use an explicit name instead of _2 to refer to the column?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>

Review Comment:
   can we have a brief comment saying what this is?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)
+            Alias(getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)), u.name)()
+          case (other, _) if containsExplicitDefaultColumn(other) =>
+            throw QueryCompilationErrors
+              .defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+          case (other, _) => other
+        }
+        val newChild = resolveColumnDefault(p.child, expectedQuerySchema, acceptProject = false)
+        val newProj = p.copy(projectList = newProjectList, child = newChild)
+        newProj.copyTagsFrom(p)
+        newProj
+
+      case _: Project | _: Aggregate if acceptInlineTable =>
+        plan.mapChildren(resolveColumnDefault(_, expectedQuerySchema, acceptProject = false))
+
+      case inlineTable: UnresolvedInlineTable if acceptInlineTable &&
+          inlineTable.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          inlineTable.rows.forall(exprs => exprs.length <= expectedQuerySchema.length) =>
+        val newRows = inlineTable.rows.map { exprs =>
+          exprs.zipWithIndex.map {
+            case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+              val field = expectedQuerySchema(i)
+              getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType))

Review Comment:
   we could integrate the `Literal(null)` part into `getDefaultValueExpr` since we want to use the NULL value if the default metadata is not present in every case. Or is this `getDefaultValueExprOrNullLiteral`, which we can use instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {

Review Comment:
   can we have some comment here describing the logic of adding new unresolved attributes referring to "DEFAULT" if the provided query has fewer columns than the target table, or else converting such existing unresolved attributes to their corresponding values?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * A virtual rule to resolve [[UnresolvedAttribute]] in [[UpdateTable]]. It's only used by the real
+ * rule `ResolveReferences`. The column resolution order for [[UpdateTable]] is:
+ * 1. Resolves the column to [[AttributeReference]] with the output of the child plan. This
+ *    includes metadata columns as well.
+ * 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
+ *    `SELECT col, current_date FROM t`.
+ * 3. Resolves the column to the default value expression, if the column is the assignment value
+ *    and the corresponding assignment key is a top-level column.
+ */
+case object ResolveReferencesInUpdate extends SQLConfHelper with ColumnResolutionHelper {
+
+  def apply(u: UpdateTable): UpdateTable = {
+    assert(u.table.resolved)
+    if (u.resolved) return u
+
+    val newAssignments = u.assignments.map { assign =>
+      val resolvedKey = assign.key match {
+        case c if !c.resolved =>
+          resolveExprInAssignment(c, u)
+        case o => o
+      }
+      val resolvedValue = assign.value match {
+        case c if !c.resolved =>
+          val resolved = resolveExprInAssignment(c, u)
+          resolvedKey match {
+            case attr: AttributeReference if conf.enableDefaultColumns =>
+              resolved match {
+                case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>

Review Comment:
   same, let's add a comment here mentioning that we're looking for unresolved attribute references to "DEFAULT" and replacing them?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _) =>

Review Comment:
   sounds good, with any luck this can help reduce dependencies on rule orderings within the analyzer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204297067


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlignMergeAssignmentsSuite.scala:
##########
@@ -602,14 +602,6 @@ class AlignMergeAssignmentsSuite extends AlignAssignmentsSuite {
   }
 
   test("invalid INSERT assignments") {
-    assertAnalysisException(
-      """MERGE INTO primitive_table t USING primitive_table src
-        |ON t.i = src.i
-        |WHEN NOT MATCHED THEN
-        | INSERT (i, txt) VALUES (src.i, src.txt)
-        |""".stripMargin,
-      "No assignment for 'l'")

Review Comment:
   We have tests to make sure missing columns will be filled with default values, e.g. https://github.com/apache/spark/pull/41262/files#diff-960688d2ad5179d1592810c50de1a163364c01c5f164bbded5d0d0dce05b39fdR859



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1206236379


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keySet.map(normalizeFieldName)
+      // For INSERT with static partitions, such as `INSERT INTO t PARTITION(c=1) SELECT ...`, the
+      // input query schema should match the table schema excluding columns with static
+      // partition values.
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      // Normally, we should match the query schema with the table schema by position. If the n-th
+      // column of the query is the DEFAULT column, we should get the default value expression
+      // defined for the n-th column of the table. However, if the INSERT has a column list, such as
+      // `INSERT INTO t(b, c, a)`, the matching should be by name. For example, the first column of
+      // the query should match the column 'b' of the table.
+      // To simplify the implementation, `resolveColumnDefault` always does by-position match. If
+      // the INSERT has a column list, we reorder the table schema w.r.t. the column list and pass
+      // the reordered schema as the expected schema to `resolveColumnDefault`.
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  /**
+   * Resolves the column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]]. A column is a
+   * "DEFAULT" column if all the following conditions are met:
+   * 1. The expression inside project list or inline table expressions is a single
+   *    [[UnresolvedAttribute]] with name "DEFAULT". This means `SELECT DEFAULT, ...` is valid but
+   *    `SELECT DEFAULT + 1, ...` is not.
+   * 2. The project list or inline table expressions have less elements than the expected schema.
+   *    To find the default value definition, we need to find the matching column for expressions
+   *    inside project list or inline table expressions. This matching is by position and it
+   *    doesn't make sense if we have more expressions than the columns of expected schema.
+   * 3. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+   *    all unary nodes that inherit the output columns from its child.
+   * 4. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+   *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+   */
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))

Review Comment:
   I don't think it's necessary but just want to keep the old behavior. Let me remove it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204305443


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1143,38 +1140,34 @@ class PlanResolutionSuite extends AnalysisTest {
 
       checkError(
         exception = intercept[AnalysisException] {
-          parseAndResolve(sql8)
+          parseAndResolve(sql8, checkAnalysis = true)

Review Comment:
   `sql8` is `s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"`
   
   I think it's almost impossible to find out all the improper places for hosting the column "DEFAULT", e.g. how about the UPDATE/MERGE assignment key? Other operators like Sort? This PR only checks the nested column "DEFAULT" and fails. If the column "DEFAULT" appears in improper places, we won't resolve it and users will hit unresolve column error.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala:
##########
@@ -1143,38 +1140,34 @@ class PlanResolutionSuite extends AnalysisTest {
 
       checkError(
         exception = intercept[AnalysisException] {
-          parseAndResolve(sql8)
+          parseAndResolve(sql8, checkAnalysis = true)

Review Comment:
   `sql8` is `s"UPDATE testcat.defaultvalues SET i=DEFAULT, s=DEFAULT WHERE i=DEFAULT"`
   
   I think it's almost impossible to find out all the improper places for hosting the column "DEFAULT", e.g. how about the UPDATE/MERGE assignment key? Other operators like Sort? This PR only checks the nested column "DEFAULT" and fails. If the column "DEFAULT" appears in improper places, we won't resolve it and users will hit unresolved column error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204293436


##########
sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala:
##########
@@ -1117,15 +1117,8 @@ class DataSourceV2SQLSuiteV1Filter
         exception = intercept[AnalysisException] {
           sql(s"INSERT INTO $t1(data, data) VALUES(5)")
         },
-        errorClass = "_LEGACY_ERROR_TEMP_2305",
-        parameters = Map(
-          "numCols" -> "3",
-          "rowSize" -> "2",
-          "ri" -> "0"),
-        context = ExpectedContext(
-          fragment = s"INSERT INTO $t1(data, data)",
-          start = 0,
-          stop = 26))
+        errorClass = "COLUMN_ALREADY_EXISTS",

Review Comment:
   The error was changed by https://github.com/apache/spark/commit/9f0bf51a3a7f6175de075198e00a55bfdc491f15 and now it's restored. I think it's more accurate to report column already exists error rather than inline table error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204257927


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3346,53 +3313,6 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
-  /**
-   * A special rule to reorder columns for DSv1 when users specify a column list in INSERT INTO.
-   * DSv2 is handled by [[ResolveInsertInto]] separately.
-   */
-  object ResolveUserSpecifiedColumns extends Rule[LogicalPlan] {
-    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
-      AlwaysProcess.fn, ruleId) {
-      case i: InsertIntoStatement if !i.table.isInstanceOf[DataSourceV2Relation] &&
-          i.table.resolved && i.query.resolved && i.userSpecifiedCols.nonEmpty =>
-        val resolved = resolveUserSpecifiedColumns(i)
-        val projection = addColumnListOnQuery(i.table.output, resolved, i.query)
-        i.copy(userSpecifiedCols = Nil, query = projection)
-    }
-
-    private def resolveUserSpecifiedColumns(i: InsertIntoStatement): Seq[NamedExpression] = {
-      SchemaUtils.checkColumnNameDuplication(i.userSpecifiedCols, resolver)
-
-      i.userSpecifiedCols.map { col =>
-        i.table.resolve(Seq(col), resolver).getOrElse {
-          val candidates = i.table.output.map(_.qualifiedName)
-          val orderedCandidates = StringUtils.orderSuggestedIdentifiersBySimilarity(col, candidates)
-          throw QueryCompilationErrors
-            .unresolvedAttributeError("UNRESOLVED_COLUMN", col, orderedCandidates, i.origin)
-        }
-      }
-    }
-
-    private def addColumnListOnQuery(
-        tableOutput: Seq[Attribute],
-        cols: Seq[NamedExpression],
-        query: LogicalPlan): LogicalPlan = {
-      if (cols.size != query.output.size) {
-        throw QueryCompilationErrors.writeTableWithMismatchedColumnsError(
-          cols.size, query.output.size, query)
-      }
-      val nameToQueryExpr = CUtils.toMap(cols, query.output)
-      // Static partition columns in the table output should not appear in the column list
-      // they will be handled in another rule ResolveInsertInto

Review Comment:
   It's fragile to resolve insert column list and static partitions separately. This PR resolves both in `PreprocessTableInsertion` for v1 insert. Spark already resolves both for v2 inserts in `ResolveInsertInto`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #41262:
URL: https://github.com/apache/spark/pull/41262#issuecomment-1566303092

   Merged to master for Apache Spark 3.5.0.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun closed pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun closed pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution
URL: https://github.com/apache/spark/pull/41262


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on a diff in pull request #41262: [WIP] refactor default column value resolution

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201437529


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInInsert.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ */
+case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {

Review Comment:
   nit: `plan` is always `InsertIntoStatement` from the caller side. I also noticed that the input of `ResolveReferencesInUpdate` is `UpdateTable`. Shall we make them consistent?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1206240373


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keySet.map(normalizeFieldName)
+      // For INSERT with static partitions, such as `INSERT INTO t PARTITION(c=1) SELECT ...`, the
+      // input query schema should match the table schema excluding columns with static
+      // partition values.
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      // Normally, we should match the query schema with the table schema by position. If the n-th
+      // column of the query is the DEFAULT column, we should get the default value expression
+      // defined for the n-th column of the table. However, if the INSERT has a column list, such as
+      // `INSERT INTO t(b, c, a)`, the matching should be by name. For example, the first column of
+      // the query should match the column 'b' of the table.
+      // To simplify the implementation, `resolveColumnDefault` always does by-position match. If
+      // the INSERT has a column list, we reorder the table schema w.r.t. the column list and pass
+      // the reordered schema as the expected schema to `resolveColumnDefault`.
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  /**
+   * Resolves the column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]]. A column is a
+   * "DEFAULT" column if all the following conditions are met:
+   * 1. The expression inside project list or inline table expressions is a single
+   *    [[UnresolvedAttribute]] with name "DEFAULT". This means `SELECT DEFAULT, ...` is valid but
+   *    `SELECT DEFAULT + 1, ...` is not.
+   * 2. The project list or inline table expressions have less elements than the expected schema.
+   *    To find the default value definition, we need to find the matching column for expressions
+   *    inside project list or inline table expressions. This matching is by position and it
+   *    doesn't make sense if we have more expressions than the columns of expected schema.
+   * 3. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+   *    all unary nodes that inherit the output columns from its child.
+   * 4. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+   *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+   */
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)

Review Comment:
   `tableSchema` is not very accurate, and neither does `insertTargetTableSchemaWithoutPartitionColumns`. It's actually table schema excluding partition columns with static values.
   
   That's why I choose `expectedQuerySchema`. People can read comments of the caller of this function to understand how we define the expected query schema.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gengliangwang commented on pull request #41262: [WIP] refactor default column value resolution

Posted by "gengliangwang (via GitHub)" <gi...@apache.org>.
gengliangwang commented on PR #41262:
URL: https://github.com/apache/spark/pull/41262#issuecomment-1558435015

   LGTM, pending on tests.
   Awesome refactoring!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dongjoon-hyun commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201838685


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO: support v2 write commands as well.

Review Comment:
   If you don't mind, please file a JIRA and use the IDed TODO here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #41262: [WIP] refactor default column value resolution

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1201531331


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInInsert.scala:
##########
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]]/[[UnresolvedInlineTable]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ */
+case object ResolveReferencesInInsert extends SQLConfHelper with ColumnResolutionHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {

Review Comment:
   We will add v2 write commands later. I'll add a TODO here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org