You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/02 15:01:19 UTC

[GitHub] [spark] gengliangwang commented on a diff in pull request #36398: [SPARK-38838][SQL] Refactor ResolveDefaultColumns.scala to simplify helper methods

gengliangwang commented on code in PR #36398:
URL: https://github.com/apache/spark/pull/36398#discussion_r862923822


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -50,60 +52,116 @@ import org.apache.spark.sql.types._
 case class ResolveDefaultColumns(
   analyzer: Analyzer,
   catalog: SessionCatalog) extends Rule[LogicalPlan] {
-
-  // This field stores the enclosing INSERT INTO command, once we find one.
-  var enclosingInsert: Option[InsertIntoStatement] = None
-  // This field stores the schema of the target table of the above command.
-  var insertTableSchemaWithoutPartitionColumns: Option[StructType] = None
-
   override def apply(plan: LogicalPlan): LogicalPlan = {
-    // Initialize by clearing our reference to the enclosing INSERT INTO command.
-    enclosingInsert = None
-    insertTableSchemaWithoutPartitionColumns = None
-    // Traverse the logical query plan in preorder (top-down).
     plan.resolveOperatorsWithPruning(
       (_ => SQLConf.get.enableDefaultColumns), ruleId) {
-      case i@InsertIntoStatement(_, _, _, _, _, _)
-        if i.query.collectFirst { case u: UnresolvedInlineTable
-          if u.rows.nonEmpty && u.rows.forall(_.size == u.rows(0).size) => u
-        }.isDefined =>
-        enclosingInsert = Some(i)
-        insertTableSchemaWithoutPartitionColumns = getInsertTableSchemaWithoutPartitionColumns
-        val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i)
-        regenerated
-
-      case table: UnresolvedInlineTable
-        if enclosingInsert.isDefined =>
-        val expanded: UnresolvedInlineTable = addMissingDefaultColumnValues(table).getOrElse(table)
-        val replaced: LogicalPlan =
-          replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
-        replaced
-
+      case i: InsertIntoStatement if insertsFromInlineTable(i) =>
+        resolveDefaultColumnsForInsertFromInlineTable(i)
       case i@InsertIntoStatement(_, _, _, project: Project, _, _)
         if !project.projectList.exists(_.isInstanceOf[Star]) =>
-        enclosingInsert = Some(i)
-        insertTableSchemaWithoutPartitionColumns = getInsertTableSchemaWithoutPartitionColumns
-        val expanded: Project = addMissingDefaultColumnValues(project).getOrElse(project)
-        val replaced: Option[LogicalPlan] = replaceExplicitDefaultColumnValues(analyzer, expanded)
-        val updated: InsertIntoStatement =
-          if (replaced.isDefined) i.copy(query = replaced.get) else i
-        val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(updated)
-        regenerated
+        resolveDefaultColumnsForInsertFromProject(i)
+    }
+  }
+
+  /**
+   * Checks if a logical plan is an INSERT INTO command where the inserted data comes from a VALUES
+   * list, with possible projection(s) and/or alias(es) in between.
+   */
+  private def insertsFromInlineTable(i: InsertIntoStatement): Boolean = {
+    var query = i.query
+    while (query.children.size == 1) {
+      query match {
+        case _: Project | _: SubqueryAlias =>
+          query = query.children(0)
+        case _ =>
+          return false
+      }
     }
+    query match {
+      case u: UnresolvedInlineTable
+        if u.rows.nonEmpty && u.rows.forall(_.size == u.rows(0).size) =>
+        true
+      case _ =>
+        false
+    }
+  }
+
+  /**
+   * Resolves DEFAULT column references for an INSERT INTO command satisfying the
+   * [[insertsFromInlineTable]] method.
+   */
+  private def resolveDefaultColumnsForInsertFromInlineTable(i: InsertIntoStatement): LogicalPlan = {
+    val children = mutable.Buffer.empty[LogicalPlan]
+    var node = i.query
+    while (node match {

Review Comment:
   nit: shall we make it like
   ```
   while (node.children.size == 1) {
   children.append(node)
   node = node.children(0)
   }
   ```
   So that we don't have to produce side effect in the check condition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org