You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "hvanhovell (via GitHub)" <gi...@apache.org> on 2023/12/04 17:58:01 UTC

[PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

hvanhovell opened a new pull request, #44162:
URL: https://github.com/apache/spark/pull/44162

   ### What changes were proposed in this pull request?
   This PR makes the following two changes:
   - The `withColumns` message is now stackable. This means that stacked `withColumns` calls can now be represented by a single message. This should reduce the chance of hitting protobuf recursion limits.
   - Resolution of `withColumns` calls is moved into the analyzer. This should make resolving stacked withColumns call significantly faster.
   
   ### Why are the changes needed?
   It is fairly common to stack withColumns calls on top of each other (e.g. when computing stats for all columns in a Dataframe). In connect plans with large withColumns chains tend to hit the protobuf recursion limit quickly. On top of this building a dataframe in these cases is rather small because each withColumns call creates a new server DataFrame.
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   Existing tests. I have added 
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1427562140


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3469,6 +3470,52 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Rewrite [[UnresolvedWithColumns]] into a [[Project]].
+   *
+   * `withColumns` can both update an existing columns and add new columns.
+   */
+  object RewriteWithColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_WITH_COLUMNS), ruleId) {
+      case UnresolvedWithColumns(projectList, child) if child.resolved =>
+        SchemaUtils.checkColumnNameDuplication(
+          projectList.map(_.name),
+          resolver)
+        val usedProjectListIndices = mutable.Set.empty[Int]
+        val outputProjectListBuilder = Seq.newBuilder[NamedExpression]
+        val lookup = {

Review Comment:
   nvm, I misread your code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1414292523


##########
connector/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -800,6 +804,9 @@ message WithColumns {
   //
   // An exception is thrown when duplicated names are present in the mapping.
   repeated Expression.Alias aliases = 2;
+
+  // (Optional when `aliases` is set)

Review Comment:
   Need more documentation.
   
   For the curious reader. I did not remove `aliases` to retain backwards compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "HyukjinKwon (via GitHub)" <gi...@apache.org>.
HyukjinKwon commented on PR #44162:
URL: https://github.com/apache/spark/pull/44162#issuecomment-1839803616

   - I don't think there's one if you meant about Python Spark Connect client. The place to put the test would be `spark/python/pyspark/sql/tests/connect/test_connect_plan.py` but I can add the test for a followup.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1427562393


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -872,6 +872,20 @@ case class TempResolvedColumn(
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
 
+/**
+<<<<<<< HEAD

Review Comment:
   nit



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1428342045


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -872,6 +872,20 @@ case class TempResolvedColumn(
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
 
+/**
+<<<<<<< HEAD
+ * Represents a `withColumns` operation. This operator both replaces existing columns
+ * (based on the name), or appends new columns.
+ */
+case class UnresolvedWithColumns(
+    projectList: Seq[NamedExpression],

Review Comment:
   but what we really need is `Seq[String, Expression]`. Not sure if we want to extend this operator more in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1416044264


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2835,29 +2835,12 @@ class Dataset[T] private[sql](
     require(colNames.size == cols.size,
       s"The size of column names: ${colNames.size} isn't equal to " +
         s"the size of columns: ${cols.size}")
-    SchemaUtils.checkColumnNameDuplication(
-      colNames,
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-    val resolver = sparkSession.sessionState.analyzer.resolver
-    val output = queryExecution.analyzed.output
-
-    val columnSeq = colNames.zip(cols)
-
-    val replacedAndExistingColumns = output.map { field =>
-      columnSeq.find { case (colName, _) =>
-        resolver(field.name, colName)
-      } match {
-        case Some((colName: String, col: Column)) => col.as(colName)
-        case _ => Column(field)
+    withPlan {
+      val aliases = cols.zip(colNames).map {
+        case (column, name) => Alias(column.expr, name)()
       }
+      UnresolvedWithColumns(aliases, logicalPlan)

Review Comment:
   @cloud-fan I have updated he implementation.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2835,29 +2835,12 @@ class Dataset[T] private[sql](
     require(colNames.size == cols.size,
       s"The size of column names: ${colNames.size} isn't equal to " +
         s"the size of columns: ${cols.size}")
-    SchemaUtils.checkColumnNameDuplication(
-      colNames,
-      sparkSession.sessionState.conf.caseSensitiveAnalysis)
-
-    val resolver = sparkSession.sessionState.analyzer.resolver
-    val output = queryExecution.analyzed.output
-
-    val columnSeq = colNames.zip(cols)
-
-    val replacedAndExistingColumns = output.map { field =>
-      columnSeq.find { case (colName, _) =>
-        resolver(field.name, colName)
-      } match {
-        case Some((colName: String, col: Column)) => col.as(colName)
-        case _ => Column(field)
+    withPlan {
+      val aliases = cols.zip(colNames).map {
+        case (column, name) => Alias(column.expr, name)()
       }
+      UnresolvedWithColumns(aliases, logicalPlan)

Review Comment:
   @cloud-fan I have updated the 'old' implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1416661147


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3469,6 +3470,52 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Rewrite [[UnresolvedWithColumns]] into a [[Project]].
+   *
+   * `withColumns` can both update an existing columns and add new columns.
+   */
+  object RewriteWithColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_WITH_COLUMNS), ruleId) {
+      case UnresolvedWithColumns(projectList, child) if child.resolved =>
+        SchemaUtils.checkColumnNameDuplication(
+          projectList.map(_.name),
+          resolver)
+        val usedProjectListIndices = mutable.Set.empty[Int]
+        val outputProjectListBuilder = Seq.newBuilder[NamedExpression]
+        val lookup = {

Review Comment:
   Yea we still need the `caseSensitiveLookup`, but we don't need to keep track a set of indices.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #44162:
URL: https://github.com/apache/spark/pull/44162#issuecomment-1839785603

   cc @grundprinzip


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] closed pull request #44162: [SPARK-46225][CONNECT] Collapse withColumns calls
URL: https://github.com/apache/spark/pull/44162


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #44162:
URL: https://github.com/apache/spark/pull/44162#issuecomment-1839188769

   For the reviewers, I still need to add this to the python client.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on PR #44162:
URL: https://github.com/apache/spark/pull/44162#issuecomment-1839786320

   @HyukjinKwon is there a test I can use to verify we are actually collapsing the withColumn calls?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1427563351


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -872,6 +872,20 @@ case class TempResolvedColumn(
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
 
+/**
+<<<<<<< HEAD
+ * Represents a `withColumns` operation. This operator both replaces existing columns
+ * (based on the name), or appends new columns.
+ */
+case class UnresolvedWithColumns(
+    projectList: Seq[NamedExpression],

Review Comment:
   is it guaranteed that the projectList is `Seq[Alias]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #44162:
URL: https://github.com/apache/spark/pull/44162#issuecomment-2017007999

   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1428165885


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -872,6 +872,20 @@ case class TempResolvedColumn(
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
 
+/**
+<<<<<<< HEAD
+ * Represents a `withColumns` operation. This operator both replaces existing columns
+ * (based on the name), or appends new columns.
+ */
+case class UnresolvedWithColumns(
+    projectList: Seq[NamedExpression],

Review Comment:
   It sort of is. I like `NamedExpression` a bit better here, because it also allows for MultiAlias and friends.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1414764985


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3469,6 +3470,52 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Rewrite [[UnresolvedWithColumns]] into a [[Project]].
+   *
+   * `withColumns` can both update an existing columns and add new columns.
+   */
+  object RewriteWithColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_WITH_COLUMNS), ruleId) {
+      case UnresolvedWithColumns(projectList, child) if child.resolved =>
+        SchemaUtils.checkColumnNameDuplication(
+          projectList.map(_.name),
+          resolver)
+        val usedProjectListIndices = mutable.Set.empty[Int]
+        val outputProjectListBuilder = Seq.newBuilder[NamedExpression]
+        val lookup = {

Review Comment:
   shall we simplify the code a bit? Basically we only need two list
   ```
   val updatedProjectList = ...
   val extraProjectList = ...
   child.output.foreach { col =>
     if (need to replace) 
       updatedProjectList += newExpr
     else 
       updatedProjectList += col
       extraProjectList += newExpr
   }
   Project(updatedProjectList ++ extraProjectList, ...)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1416043099


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3469,6 +3470,52 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Rewrite [[UnresolvedWithColumns]] into a [[Project]].
+   *
+   * `withColumns` can both update an existing columns and add new columns.
+   */
+  object RewriteWithColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_WITH_COLUMNS), ruleId) {
+      case UnresolvedWithColumns(projectList, child) if child.resolved =>
+        SchemaUtils.checkColumnNameDuplication(
+          projectList.map(_.name),
+          resolver)
+        val usedProjectListIndices = mutable.Set.empty[Int]
+        val outputProjectListBuilder = Seq.newBuilder[NamedExpression]
+        val lookup = {

Review Comment:
   I am not sure how that would work. We only know the appended columns if we have seen all existing columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1427562850


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala:
##########
@@ -872,6 +872,20 @@ case class TempResolvedColumn(
   final override val nodePatterns: Seq[TreePattern] = Seq(TEMP_RESOLVED_COLUMN)
 }
 
+/**
+<<<<<<< HEAD
+ * Represents a `withColumns` operation. This operator both replaces existing columns
+ * (based on the name), or appends new columns.

Review Comment:
   Shall we mention that there can't be duplicated names in the `projectList`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46225][CONNECT] Collapse withColumns calls [spark]

Posted by "hvanhovell (via GitHub)" <gi...@apache.org>.
hvanhovell commented on code in PR #44162:
URL: https://github.com/apache/spark/pull/44162#discussion_r1427140961


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -3469,6 +3470,52 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
     }
   }
 
+  /**
+   * Rewrite [[UnresolvedWithColumns]] into a [[Project]].
+   *
+   * `withColumns` can both update an existing columns and add new columns.
+   */
+  object RewriteWithColumns extends Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning(
+      _.containsPattern(UNRESOLVED_WITH_COLUMNS), ruleId) {
+      case UnresolvedWithColumns(projectList, child) if child.resolved =>
+        SchemaUtils.checkColumnNameDuplication(
+          projectList.map(_.name),
+          resolver)
+        val usedProjectListIndices = mutable.Set.empty[Int]
+        val outputProjectListBuilder = Seq.newBuilder[NamedExpression]
+        val lookup = {

Review Comment:
   So, AFAIK, we still support duplicate names in projects. I can replace the bitmap by something else, but it will still be similar.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org