You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/05/25 00:36:22 UTC

[GitHub] [spark] dtenedor commented on a diff in pull request #41262: [SPARK-43742][SQL] Refactor default column value resolution

dtenedor commented on code in PR #41262:
URL: https://github.com/apache/spark/pull/41262#discussion_r1204667363


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet

Review Comment:
   this is a bit hard to read, can we split the transformations into different lines with vals, and use an explicit name instead of _2 to refer to the column?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>

Review Comment:
   can we have a brief comment saying what this is?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {
+          case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+            val field = expectedQuerySchema(i)
+            Alias(getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType)), u.name)()
+          case (other, _) if containsExplicitDefaultColumn(other) =>
+            throw QueryCompilationErrors
+              .defaultReferencesNotAllowedInComplexExpressionsInInsertValuesList()
+          case (other, _) => other
+        }
+        val newChild = resolveColumnDefault(p.child, expectedQuerySchema, acceptProject = false)
+        val newProj = p.copy(projectList = newProjectList, child = newChild)
+        newProj.copyTagsFrom(p)
+        newProj
+
+      case _: Project | _: Aggregate if acceptInlineTable =>
+        plan.mapChildren(resolveColumnDefault(_, expectedQuerySchema, acceptProject = false))
+
+      case inlineTable: UnresolvedInlineTable if acceptInlineTable &&
+          inlineTable.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          inlineTable.rows.forall(exprs => exprs.length <= expectedQuerySchema.length) =>
+        val newRows = inlineTable.rows.map { exprs =>
+          exprs.zipWithIndex.map {
+            case (u: UnresolvedAttribute, i) if isExplicitDefaultColumn(u) =>
+              val field = expectedQuerySchema(i)
+              getDefaultValueExpr(field).getOrElse(Literal(null, field.dataType))

Review Comment:
   we could integrate the `Literal(null)` part into `getDefaultValueExpr` since we want to use the NULL value if the default metadata is not present in every case. Or is this `getDefaultValueExprOrNullLiteral`, which we can use instead?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveColumnDefaultInInsert.scala:
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_ATTRIBUTE
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.StructField
+
+/**
+ * A virtual rule to resolve column "DEFAULT" in [[Project]] and [[UnresolvedInlineTable]] under
+ * [[InsertIntoStatement]]. It's only used by the real rule `ResolveReferences`.
+ *
+ * This virtual rule is triggered if:
+ * 1. The column "DEFAULT" can't be resolved normally by `ResolveReferences`. This is guaranteed as
+ *    `ResolveReferences` resolves the query plan bottom up. This means that when we reach here to
+ *    resolve [[InsertIntoStatement]], its child plans have already been resolved by
+ *    `ResolveReferences`.
+ * 2. The plan nodes between [[Project]] and [[InsertIntoStatement]] are
+ *    all unary nodes that inherit the output columns from its child.
+ * 3. The plan nodes between [[UnresolvedInlineTable]] and [[InsertIntoStatement]] are either
+ *    [[Project]], or [[Aggregate]], or [[SubqueryAlias]].
+ */
+case object ResolveColumnDefaultInInsert extends SQLConfHelper with ColumnResolutionHelper {
+  // TODO (SPARK-43752): support v2 write commands as well.
+  def apply(plan: LogicalPlan): LogicalPlan = plan match {
+    case i: InsertIntoStatement if conf.enableDefaultColumns && i.table.resolved &&
+        i.query.containsPattern(UNRESOLVED_ATTRIBUTE) =>
+      val staticPartCols = i.partitionSpec.filter(_._2.isDefined).keys.map(normalizeFieldName).toSet
+      val expectedQuerySchema = i.table.schema.filter { field =>
+        !staticPartCols.contains(normalizeFieldName(field.name))
+      }
+      if (i.userSpecifiedCols.isEmpty) {
+        i.withNewChildren(Seq(resolveColumnDefault(i.query, expectedQuerySchema)))
+      } else {
+        // Reorder the fields in `expectedQuerySchema` according to the user-specified column list
+        // of the INSERT command.
+        val colNamesToFields: Map[String, StructField] = expectedQuerySchema.map { field =>
+          normalizeFieldName(field.name) -> field
+        }.toMap
+        val reorder = i.userSpecifiedCols.map { col =>
+          colNamesToFields.get(normalizeFieldName(col))
+        }
+        if (reorder.forall(_.isDefined)) {
+          i.withNewChildren(Seq(resolveColumnDefault(i.query, reorder.flatten)))
+        } else {
+          i
+        }
+      }
+
+    case _ => plan
+  }
+
+  private def resolveColumnDefault(
+      plan: LogicalPlan,
+      expectedQuerySchema: Seq[StructField],
+      acceptProject: Boolean = true,
+      acceptInlineTable: Boolean = true): LogicalPlan = {
+    plan match {
+      case _: SubqueryAlias =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptProject, acceptInlineTable))
+
+      case _: GlobalLimit | _: LocalLimit | _: Offset | _: Sort if acceptProject =>
+        plan.mapChildren(
+          resolveColumnDefault(_, expectedQuerySchema, acceptInlineTable = false))
+
+      case p: Project if acceptProject && p.child.resolved &&
+          p.containsPattern(UNRESOLVED_ATTRIBUTE) &&
+          p.projectList.length <= expectedQuerySchema.length =>
+        val newProjectList = p.projectList.zipWithIndex.map {

Review Comment:
   can we have some comment here describing the logic of adding new unresolved attributes referring to "DEFAULT" if the provided query has fewer columns than the target table, or else converting such existing unresolved attributes to their corresponding values?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveReferencesInUpdate.scala:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.SQLConfHelper
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Literal}
+import org.apache.spark.sql.catalyst.plans.logical.{Assignment, UpdateTable}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.{containsExplicitDefaultColumn, getDefaultValueExpr, isExplicitDefaultColumn}
+import org.apache.spark.sql.errors.QueryCompilationErrors
+
+/**
+ * A virtual rule to resolve [[UnresolvedAttribute]] in [[UpdateTable]]. It's only used by the real
+ * rule `ResolveReferences`. The column resolution order for [[UpdateTable]] is:
+ * 1. Resolves the column to [[AttributeReference]] with the output of the child plan. This
+ *    includes metadata columns as well.
+ * 2. Resolves the column to a literal function which is allowed to be invoked without braces, e.g.
+ *    `SELECT col, current_date FROM t`.
+ * 3. Resolves the column to the default value expression, if the column is the assignment value
+ *    and the corresponding assignment key is a top-level column.
+ */
+case object ResolveReferencesInUpdate extends SQLConfHelper with ColumnResolutionHelper {
+
+  def apply(u: UpdateTable): UpdateTable = {
+    assert(u.table.resolved)
+    if (u.resolved) return u
+
+    val newAssignments = u.assignments.map { assign =>
+      val resolvedKey = assign.key match {
+        case c if !c.resolved =>
+          resolveExprInAssignment(c, u)
+        case o => o
+      }
+      val resolvedValue = assign.value match {
+        case c if !c.resolved =>
+          val resolved = resolveExprInAssignment(c, u)
+          resolvedKey match {
+            case attr: AttributeReference if conf.enableDefaultColumns =>
+              resolved match {
+                case u: UnresolvedAttribute if isExplicitDefaultColumn(u) =>

Review Comment:
   same, let's add a comment here mentioning that we're looking for unresolved attribute references to "DEFAULT" and replacing them?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1080,7 +1077,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) if i.query.resolved =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _) =>

Review Comment:
   sounds good, with any luck this can help reduce dependencies on rule orderings within the analyzer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org