You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/06/07 06:58:30 UTC

[GitHub] [spark] HyukjinKwon opened a new pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

HyukjinKwon opened a new pull request #28745:
URL: https://github.com/apache/spark/pull/28745


   ### What changes were proposed in this pull request?
   
   This PR proposes to remove the useless projection in grouped and co-grouped UDFs, which can cause the analysis failure when the grouping column is specified with different upper-lower cases compared to the one specified in the return schema.
   
   Currently, projection is initially added in grouped and cogrouped pandas UDFs to keep the grouping keys. For example,
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   adds a projection that includes the grouping keys:
   
   ```bash
   == Parsed Logical Plan ==
   'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(COLUMN#166L, Score#167L), [COLUMN#193, Score#194]
   +- 'Project ['COLUMN, column#166L, Score#167L]  # <---- Here
   ...
   ```
   
   which later causes the reference resolution failure:
   
   ```
   pyspark.sql.utils.AnalysisException: "Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;"
   ```
   
   In fact, we don't need to add the grouping keys at all because grouped and co-grouped pandas UDFs _always_ require to take _all_ columns as input pandas UDF.
   
   After this fix, it will be as below:
   
   ```bash
   == Parsed Logical Plan ==
   'FlatMapGroupsInPandas ['COLUMN], my_pandas_udf(column#0L, Score#1L), [column#9, Score#10]
   +- LogicalRDD [column#0L, Score#1L], false
   ```
   
   
   
   ### Why are the changes needed?
   
   This change will fix two things:
   - Performance improvement by not projecting duplicated columns in case the output schema contains the grouping key.
   - A bug related to the case sensitivity, see below.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes,
   
   
   
   ```python
   from pyspark.sql.functions import *
   df = spark.createDataFrame([[1, 1]], ["column", "Score"])
   @pandas_udf("column integer, Score float", PandasUDFType.GROUPED_MAP)
   def my_pandas_udf(pdf):
       return pdf.assign(Score=0.5)
   
   df.groupby('COLUMN').apply(my_pandas_udf).show()
   ```
   
   ```python
   df1 = spark.createDataFrame([(1, 1)], ("column", "value"))
   df2 = spark.createDataFrame([(1, 1)], ("column", "value"))
   
   df1.groupby("COLUMN").cogroup(
       df2.groupby("COLUMN")
   ).applyInPandas(lambda r, l: r + l, df1.schema).show()
   ```
   
   Before:
   
   ```
   pyspark.sql.utils.AnalysisException: Reference 'COLUMN' is ambiguous, could be: COLUMN, COLUMN.;
   ```
   
   ```
   pyspark.sql.utils.AnalysisException: cannot resolve '`COLUMN`' given input columns: [COLUMN, COLUMN, value, value];;
   'FlatMapCoGroupsInPandas ['COLUMN], ['COLUMN], <lambda>(column#9L, value#10L, column#13L, value#14L), [column#22L, value#23L]
   :- Project [COLUMN#9L, column#9L, value#10L]
   :  +- LogicalRDD [column#9L, value#10L], false
   +- Project [COLUMN#13L, column#13L, value#14L]
      +- LogicalRDD [column#13L, value#14L], false
   ```
   
   
   After:
   
   ```
   +------+-----+
   |column|Score|
   +------+-----+
   |     1|  0.5|
   +------+-----+
   ```
   
   ```
   +------+-----+
   |column|value|
   +------+-----+
   |     2|    2|
   +------+-----+
   ```
   
   ### How was this patch tested?
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641692935


   BTW, I skimmed `Aggregator` related analysis and I think we're all good with the current change if I didn't miss anything.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon edited a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon edited a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641674305


   > will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.
   
   I would say this is kind of a design choice. In the other way, we should add the projection to all grouping expressions, and will need to keep more data intermediately on the other hand.
   
   This PR matches the implementation with existing grouping expressions - it shouldn't be matched with object expressions because grouped and cogrouped UDFs actually should pass a key separately to UDF to use which object expressions don't.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640201907






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640201627


   **[Test build #123598 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123598/testReport)** for PR 28745 at commit [`9522493`](https://github.com/apache/spark/commit/9522493bb65e018adff4333777e078a866a4bedb).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon closed pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon closed pull request #28745:
URL: https://github.com/apache/spark/pull/28745


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r437409581



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2755,11 +2753,23 @@ class Analyzer(
       case f: Filter => f
 
       case a: Aggregate if a.groupingExpressions.exists(!_.deterministic) =>
-        val nondeterToAttr = getNondeterToAttr(a.groupingExpressions)
-        val newChild = Project(a.child.output ++ nondeterToAttr.values, a.child)
-        a.transformExpressions { case e =>
-          nondeterToAttr.get(e).map(_.toAttribute).getOrElse(e)
-        }.copy(child = newChild)
+        projectGroupingExprs(a, a.groupingExpressions)
+
+      case f: FlatMapGroupsInPandas if f.groupingExprs.exists(!_.deterministic) =>

Review comment:
       So, basically this is for the case when grouping expressions are non-deterministic:
   
   ```bash
   == Physical Plan ==
   FlatMapGroupsInPandas [_nondeterministic#14], my_pandas_udf(column#4L, score#6), [column#12, score#13]
   +- *(2) Sort [_nondeterministic#14 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(_nondeterministic#14, 200), true, [id=#19]
         +- *(1) Project [column#4L, score#6, rand(42) AS _nondeterministic#14]  # <--- here to evaluate non-deterministic expression only once.
   ...
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon edited a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon edited a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641674305


   > will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.
   
   I would say this is kind of a design choice. In the other way, we should add the projection to all grouping expressions, and will need to keep more data intermediately on the other hand. This PR matches the implementation - it shouldn't be matched with object expressions because grouped and cogrouped UDFs actually should pass a key separately to UDF to use which object expressions don't.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640205250






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640168198






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640169465






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] TJX2014 commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
TJX2014 commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436361472



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
##########
@@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, toPrettySQL}
 import org.apache.spark.sql.types._
 
 object NamedExpression {
   private val curId = new java.util.concurrent.atomic.AtomicLong()
   private[expressions] val jvmId = UUID.randomUUID()
   def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId)
   def unapply(expr: NamedExpression): Option[(String, DataType)] = Some((expr.name, expr.dataType))
+  def fromExpression(expr: Expression): NamedExpression = expr match {
+    case ne: NamedExpression => ne
+    case _: Expression => Alias(expr, toPrettySQL(expr))()
+  }

Review comment:
       I find `org.apache.spark.sql.Dataset#groupBy(cols: Column*)` is invoked through py4j instead of `groupBy(col1: String, cols: String*)`, is it possible to change param sent in python side only to invoke `groupBy(col1: String, cols: String*)` :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436436434



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
##########
@@ -608,10 +608,14 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
         execution.MapPartitionsInRWithArrowExec(
           f, p, b, is, ot, planLater(child)) :: Nil
       case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
-        execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
-      case logical.FlatMapCoGroupsInPandas(leftGroup, rightGroup, func, output, left, right) =>
+        val groupingExprs = grouping.map(NamedExpression.fromExpression)
+        execution.python.FlatMapGroupsInPandasExec(
+          groupingExprs, func, output, planLater(child)) :: Nil
+      case logical.FlatMapCoGroupsInPandas(leftExprs, rightExprs, func, output, left, right) =>
+        val leftAttrs = leftExprs.map(NamedExpression.fromExpression)
+        val rightAttrs = rightExprs.map(NamedExpression.fromExpression)
         execution.python.FlatMapCoGroupsInPandasExec(
-          leftGroup, rightGroup, func, output, planLater(left), planLater(right)) :: Nil
+          leftAttrs, rightAttrs, func, output, planLater(left), planLater(right)) :: Nil

Review comment:
       leftNamedExprs/rightNamedExprs or leftGroupingExprs/rightGroupingExprs? They are not attributes actually.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       ok, looks good after re-checking. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
##########
@@ -60,42 +60,51 @@ case class FlatMapCoGroupsInPandasExec(
   private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
   private val pandasFunction = func.asInstanceOf[PythonUDF].func
   private val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+  private val inputExprs =
+    func.asInstanceOf[PythonUDF].children.map(_.asInstanceOf[NamedExpression])
+  private val leftExprs =
+    left.output.filter(e => inputExprs.exists(_.semanticEquals(e)))
+  private val rightExprs =
+    right.output.filter(e => inputExprs.exists(_.semanticEquals(e)))

Review comment:
       leftAttributes and rightAttributes? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
##########
@@ -60,42 +60,51 @@ case class FlatMapCoGroupsInPandasExec(
   private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
   private val pandasFunction = func.asInstanceOf[PythonUDF].func
   private val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+  private val inputExprs =
+    func.asInstanceOf[PythonUDF].children.map(_.asInstanceOf[NamedExpression])
+  private val leftExprs =
+    left.output.filter(e => inputExprs.exists(_.semanticEquals(e)))
+  private val rightExprs =
+    right.output.filter(e => inputExprs.exists(_.semanticEquals(e)))
 
   override def producedAttributes: AttributeSet = AttributeSet(output)
 
   override def outputPartitioning: Partitioning = left.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] = {
-    val leftDist = if (leftGroup.isEmpty) AllTuples else ClusteredDistribution(leftGroup)
-    val rightDist = if (rightGroup.isEmpty) AllTuples else ClusteredDistribution(rightGroup)
+    val leftDist =
+      if (leftGroupingExprs.isEmpty) AllTuples else ClusteredDistribution(leftGroupingExprs)
+    val rightDist =
+      if (rightGroupingExprs.isEmpty) AllTuples else ClusteredDistribution(rightGroupingExprs)
     leftDist :: rightDist :: Nil
   }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-    leftGroup
-      .map(SortOrder(_, Ascending)) :: rightGroup.map(SortOrder(_, Ascending)) :: Nil
+    leftGroupingExprs
+      .map(SortOrder(_, Ascending)) :: rightGroupingExprs.map(SortOrder(_, Ascending)) :: Nil
   }
 
   override protected def doExecute(): RDD[InternalRow] = {
 
-    val (leftDedup, leftArgOffsets) = resolveArgOffsets(left, leftGroup)
-    val (rightDedup, rightArgOffsets) = resolveArgOffsets(right, rightGroup)
+    val (leftDedup, leftArgOffsets) = resolveArgOffsets(leftExprs, leftGroupingExprs)
+    val (rightDedup, rightArgOffsets) = resolveArgOffsets(rightExprs, rightGroupingExprs)
 
     // Map cogrouped rows to ArrowPythonRunner results, Only execute if partition is not empty
     left.execute().zipPartitions(right.execute())  { (leftData, rightData) =>
       if (leftData.isEmpty && rightData.isEmpty) Iterator.empty else {
 
-        val leftGrouped = groupAndProject(leftData, leftGroup, left.output, leftDedup)
-        val rightGrouped = groupAndProject(rightData, rightGroup, right.output, rightDedup)
-        val data = new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup)
+        val leftGrouped = groupAndProject(leftData, leftGroupingExprs, left.output, leftDedup)
+        val rightGrouped = groupAndProject(rightData, rightGroupingExprs, right.output, rightDedup)

Review comment:
       One disadvantage I can think of is, previously we evaluate grouping expressions in underlying projection. Now we move the grouping expression evaluation inside `FlatMapCoGroupsInPandasExec` execution.
   
   As we requires specified child distribution `leftGroupingExprs` and `rightGroupingExprs` in `requiredChildDistribution`. We would possibly add shuffle below `FlatMapCoGroupsInPandasExec`. That's said we evaluate grouping expressions twice and if any non-deterministic expressions inside, we probably get incorrect results.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436560561



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapCoGroupsInPandasExec.scala
##########
@@ -60,42 +60,51 @@ case class FlatMapCoGroupsInPandasExec(
   private val pythonRunnerConf = ArrowUtils.getPythonRunnerConfMap(conf)
   private val pandasFunction = func.asInstanceOf[PythonUDF].func
   private val chainedFunc = Seq(ChainedPythonFunctions(Seq(pandasFunction)))
+  private val inputExprs =
+    func.asInstanceOf[PythonUDF].children.map(_.asInstanceOf[NamedExpression])
+  private val leftExprs =
+    left.output.filter(e => inputExprs.exists(_.semanticEquals(e)))
+  private val rightExprs =
+    right.output.filter(e => inputExprs.exists(_.semanticEquals(e)))
 
   override def producedAttributes: AttributeSet = AttributeSet(output)
 
   override def outputPartitioning: Partitioning = left.outputPartitioning
 
   override def requiredChildDistribution: Seq[Distribution] = {
-    val leftDist = if (leftGroup.isEmpty) AllTuples else ClusteredDistribution(leftGroup)
-    val rightDist = if (rightGroup.isEmpty) AllTuples else ClusteredDistribution(rightGroup)
+    val leftDist =
+      if (leftGroupingExprs.isEmpty) AllTuples else ClusteredDistribution(leftGroupingExprs)
+    val rightDist =
+      if (rightGroupingExprs.isEmpty) AllTuples else ClusteredDistribution(rightGroupingExprs)
     leftDist :: rightDist :: Nil
   }
 
   override def requiredChildOrdering: Seq[Seq[SortOrder]] = {
-    leftGroup
-      .map(SortOrder(_, Ascending)) :: rightGroup.map(SortOrder(_, Ascending)) :: Nil
+    leftGroupingExprs
+      .map(SortOrder(_, Ascending)) :: rightGroupingExprs.map(SortOrder(_, Ascending)) :: Nil
   }
 
   override protected def doExecute(): RDD[InternalRow] = {
 
-    val (leftDedup, leftArgOffsets) = resolveArgOffsets(left, leftGroup)
-    val (rightDedup, rightArgOffsets) = resolveArgOffsets(right, rightGroup)
+    val (leftDedup, leftArgOffsets) = resolveArgOffsets(leftExprs, leftGroupingExprs)
+    val (rightDedup, rightArgOffsets) = resolveArgOffsets(rightExprs, rightGroupingExprs)
 
     // Map cogrouped rows to ArrowPythonRunner results, Only execute if partition is not empty
     left.execute().zipPartitions(right.execute())  { (leftData, rightData) =>
       if (leftData.isEmpty && rightData.isEmpty) Iterator.empty else {
 
-        val leftGrouped = groupAndProject(leftData, leftGroup, left.output, leftDedup)
-        val rightGrouped = groupAndProject(rightData, rightGroup, right.output, rightDedup)
-        val data = new CoGroupedIterator(leftGrouped, rightGrouped, leftGroup)
+        val leftGrouped = groupAndProject(leftData, leftGroupingExprs, left.output, leftDedup)
+        val rightGrouped = groupAndProject(rightData, rightGroupingExprs, right.output, rightDedup)

Review comment:
       Oh that's actually a very good point. Let me check how other grouping expressions work and update or bring some answers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640201907






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640205250






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641499511


   **[Test build #123698 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123698/testReport)** for PR 28745 at commit [`2bf09c0`](https://github.com/apache/spark/commit/2bf09c0b2b009ddea5d9fe1b08c52bcee705e2f7).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640170017


   **[Test build #123599 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123599/testReport)** for PR 28745 at commit [`2800eb2`](https://github.com/apache/spark/commit/2800eb238465547074498eae762199a53efc4277).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641674305


   > will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.
   
   I would say this is kind of a design choice. In that why, we should add the projection to all grouping expressions. It will send less data on the other hand. This PR matches the implementation - it shouldn't be matched with object expressions because grouped and cogrouped UDFs actually should pass a key separately to UDF which object expressions don't.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641500668






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640204902


   **[Test build #123599 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123599/testReport)** for PR 28745 at commit [`2800eb2`](https://github.com/apache/spark/commit/2800eb238465547074498eae762199a53efc4277).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640168198






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641432882


   will this cause perf regression? e.g. if grouping expr is expensive, with the Project we only need to evaluate it once.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640168086


   **[Test build #123598 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123598/testReport)** for PR 28745 at commit [`9522493`](https://github.com/apache/spark/commit/9522493bb65e018adff4333777e078a866a4bedb).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436424768



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
##########
@@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, toPrettySQL}
 import org.apache.spark.sql.types._
 
 object NamedExpression {
   private val curId = new java.util.concurrent.atomic.AtomicLong()
   private[expressions] val jvmId = UUID.randomUUID()
   def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId)
   def unapply(expr: NamedExpression): Option[(String, DataType)] = Some((expr.name, expr.dataType))
+  def fromExpression(expr: Expression): NamedExpression = expr match {
+    case ne: NamedExpression => ne
+    case _: Expression => Alias(expr, toPrettySQL(expr))()
+  }

Review comment:
       Yeah, let me take a look separate with a separate JIRA.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436425399



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       Just for doubly sure, `column + 1` case is being tested at https://github.com/apache/spark/blob/ab0890bdb18dcd0441f6082afbe4c84219611e87/python/pyspark/sql/tests/test_pandas_cogrouped_map.py#L161-L174




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640168198






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436425345



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       Just for doubly sure, `column + 1` case is being tested at https://github.com/apache/spark/blob/ab0890bdb18dcd0441f6082afbe4c84219611e87/python/pyspark/sql/tests/test_pandas_cogrouped_map.py#L161-L174
   I know it because it failed during I prepare the fix :D.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon edited a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon edited a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641692935


   BTW, I skimmed `Aggregate` related analysis and I think we're all good with the current change if I didn't miss anything.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641775258


   Okay, let me try to just workaround the problem alone .. it's too invasive ..


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641289193


   **[Test build #123698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123698/testReport)** for PR 28745 at commit [`2bf09c0`](https://github.com/apache/spark/commit/2bf09c0b2b009ddea5d9fe1b08c52bcee705e2f7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436333468



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
##########
@@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, toPrettySQL}
 import org.apache.spark.sql.types._
 
 object NamedExpression {
   private val curId = new java.util.concurrent.atomic.AtomicLong()
   private[expressions] val jvmId = UUID.randomUUID()
   def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId)
   def unapply(expr: NamedExpression): Option[(String, DataType)] = Some((expr.name, expr.dataType))
+  def fromExpression(expr: Expression): NamedExpression = expr match {
+    case ne: NamedExpression => ne
+    case _: Expression => Alias(expr, toPrettySQL(expr))()
+  }

Review comment:
       I will send another PR to use this in other places, for example,
   
   https://github.com/apache/spark/blob/a3a42b30d04009282e770c289b043ca5941e32e5/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/AggUtils.scala#L147-L150
   
   https://github.com/apache/spark/blob/6c80ebbccb7f354f645dd63a73114332d26f901f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala#L361-L364
   
   https://github.com/apache/spark/blob/17857f9b8bdf95eb64735eb986e86bf8fd8bc1a9/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala#L83-L89
   
   and possibly at:
   
   https://github.com/apache/spark/blob/7195a18bf24d9506d2f8d9d4d93ff679b3d21b65/sql/core/src/main/scala/org/apache/spark/sql/Column.scala#L204
   
   I can don't add this util here for now too if anyone is not sure on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436424483



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       Actually the `groupingExprs` will be projected at https://github.com/apache/spark/pull/28745/files/2800eb238465547074498eae762199a53efc4277#diff-e7c34a6080e15837af82863db34fb1c4R66 for the input iterator before the actual execution.
   
   The `groupingExprs` were already dropped in this code without this fix https://github.com/apache/spark/pull/28745/files/2800eb238465547074498eae762199a53efc4277#diff-e7c34a6080e15837af82863db34fb1c4L93
   
   I believe there's no difference virtually in the execution path here.
   
   For analysis,
   
   With this change: `groupingExprs` at `FlatMapGroupsInPandasExec`, for example, `column + 1`.  The attributes `column` inside `column + 1` will be properly resolved, and then it becomes an alias to project later during execution.
   
   Without this change: `Project`'s output contains the grouping expression as a separate attribute reference, `column + 1` (whereas the current fix keeps it as an expression). `FlatMapGroupsInPandasExec` contains the attribute reference as a grouping expression , and this grouping attribute will be used to project later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] TJX2014 commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
TJX2014 commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436361472



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
##########
@@ -23,14 +23,18 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
-import org.apache.spark.sql.catalyst.util.quoteIdentifier
+import org.apache.spark.sql.catalyst.util.{quoteIdentifier, toPrettySQL}
 import org.apache.spark.sql.types._
 
 object NamedExpression {
   private val curId = new java.util.concurrent.atomic.AtomicLong()
   private[expressions] val jvmId = UUID.randomUUID()
   def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId)
   def unapply(expr: NamedExpression): Option[(String, DataType)] = Some((expr.name, expr.dataType))
+  def fromExpression(expr: Expression): NamedExpression = expr match {
+    case ne: NamedExpression => ne
+    case _: Expression => Alias(expr, toPrettySQL(expr))()
+  }

Review comment:
       I find `org.apache.spark.sql.Dataset#groupBy(cols: Column*)` is invoked through py4j instead of `groupBy(col1: String, cols: String*)`, is it possible to change param sent in python side only to invoke `groupBy(col1: String, cols: String*)`, which may also be helpful to this jira :-)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436424483



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       Actually the `groupingExprs` will be projected at https://github.com/apache/spark/pull/28745/files/2800eb238465547074498eae762199a53efc4277#diff-e7c34a6080e15837af82863db34fb1c4R66 for the input iterator before the actual execution.
   
   The `groupingExprs` were already dropped in this code without this fix https://github.com/apache/spark/pull/28745/files/2800eb238465547074498eae762199a53efc4277#diff-e7c34a6080e15837af82863db34fb1c4L93
   
   I believe there's no difference virtually in the execution path here.
   
   For analysis,
   
   With this change: `groupingExprs` at `FlatMapGroupsInPandasExec`, for example, `column + 1`.  The attributes `column` inside `column + 1` will be properly resolved, and then it becomes an alias to project later during execution.
   
   Without this change: `Project`'s output contains the grouping expression as a separate attribute reference, `column + 1` (whereas the current fix keeps it as an expression). `FlatMapGroupsInPandasExec` contains the attribute reference as a grouping expression , and this grouping expression will be used to project later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641290863






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641289193


   **[Test build #123698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123698/testReport)** for PR 28745 at commit [`2bf09c0`](https://github.com/apache/spark/commit/2bf09c0b2b009ddea5d9fe1b08c52bcee705e2f7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640170017


   **[Test build #123599 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123599/testReport)** for PR 28745 at commit [`2800eb2`](https://github.com/apache/spark/commit/2800eb238465547074498eae762199a53efc4277).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641500668






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640169465






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-640168086


   **[Test build #123598 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/123598/testReport)** for PR 28745 at commit [`9522493`](https://github.com/apache/spark/commit/9522493bb65e018adff4333777e078a866a4bedb).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #28745:
URL: https://github.com/apache/spark/pull/28745#discussion_r436380321



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/python/PandasGroupUtils.scala
##########
@@ -59,65 +59,65 @@ private[python] object PandasGroupUtils {
    */
   def groupAndProject(
       input: Iterator[InternalRow],
-      groupingAttributes: Seq[Attribute],
+      groupingExprs: Seq[NamedExpression],
       inputSchema: Seq[Attribute],
-      dedupSchema: Seq[Attribute]): Iterator[(InternalRow, Iterator[InternalRow])] = {
-    val groupedIter = GroupedIterator(input, groupingAttributes, inputSchema)
+      dedupSchema: Seq[NamedExpression]): Iterator[(InternalRow, Iterator[InternalRow])] = {
+    val groupedIter = GroupedIterator(input, groupingExprs, inputSchema)
     val dedupProj = UnsafeProjection.create(dedupSchema, inputSchema)
     groupedIter.map {
       case (k, groupedRowIter) => (k, groupedRowIter.map(dedupProj))
     }
   }
 
   /**
-   * Returns a the deduplicated attributes of the spark plan and the arg offsets of the
+   * Returns a the deduplicated named expressions of the spark plan and the arg offsets of the
    * keys and values.
    *
-   * The deduplicated attributes are needed because the spark plan may contain an attribute
-   * twice; once in the key and once in the value.  For any such attribute we need to
+   * The deduplicated expressions are needed because the spark plan may contain an expression
+   * twice; once in the key and once in the value.  For any such expression we need to
    * deduplicate.
    *
-   * The arg offsets are used to distinguish grouping grouping attributes and data attributes
+   * The arg offsets are used to distinguish grouping expressions and data expressions
    * as following:
    *
    * argOffsets[0] is the length of the argOffsets array
    *
-   * argOffsets[1] is the length of grouping attribute
-   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping attributes
+   * argOffsets[1] is the length of grouping expression
+   * argOffsets[2 .. argOffsets[0]+2] is the arg offsets for grouping expressions
    *
-   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data attributes
+   * argOffsets[argOffsets[0]+2 .. ] is the arg offsets for data expressions
    */
   def resolveArgOffsets(
-    child: SparkPlan, groupingAttributes: Seq[Attribute]): (Seq[Attribute], Array[Int]) = {
+      dataExprs: Seq[NamedExpression], groupingExprs: Seq[NamedExpression])
+    : (Seq[NamedExpression], Array[Int]) = {
 
-    val dataAttributes = child.output.drop(groupingAttributes.length)
-    val groupingIndicesInData = groupingAttributes.map { attribute =>
-      dataAttributes.indexWhere(attribute.semanticEquals)
+    val groupingIndicesInData = groupingExprs.map { expression =>
+      dataExprs.indexWhere(expression.semanticEquals)
     }

Review comment:
       I feel this looks not precisely correct at all cases. Seems `dataExprs` are inputs to Python UDFs. Is it possible that `groupingExprs` are not just child's outputs but expressions like `column + 1`? 
   
   In `RelationalGroupedDataset`, we added one projection previously to put these grouping expressions with original child's outputs. Now we don't have it. So can we always find semantically equal expr in `dataExprs` for a grouping expression? `dataExprs` are input expressions in left/right plan for `FlatMapCoGroupsInPandasExec`, so I guess we cannot find `column + 1` in it.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #28745: [SPARK-31915][SQL][PYTHON] Remove projection that adds grouping keys in grouped and cogrouped pandas UDFs

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #28745:
URL: https://github.com/apache/spark/pull/28745#issuecomment-641290863






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org