You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ueshin (via GitHub)" <gi...@apache.org> on 2023/08/08 19:06:19 UTC

[GitHub] [spark] ueshin commented on a diff in pull request #42351: [SPARK-44503][SQL] Project any PARTITION BY expressions not already returned from Python UDTF TABLE arguments

ueshin commented on code in PR #42351:
URL: https://github.com/apache/spark/pull/42351#discussion_r1287537496


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -104,23 +104,77 @@ case class FunctionTableSubqueryArgumentExpression(
     // the query plan.
     var subquery = plan
     if (partitionByExpressions.nonEmpty) {
-      subquery = RepartitionByExpression(
-        partitionExpressions = partitionByExpressions,
-        child = subquery,
-        optNumPartitions = None)
+      // Add a projection to project each of the partitioning expressions that it is not a simple
+      // attribute that is already present in the plan output. Then add a sort operation by the
+      // partition keys (plus any explicit ORDER BY items) since after the hash-based shuffle
+      // operation, the rows from several partitions may arrive interleaved. In this way, the Python
+      // UDTF evaluator is able to inspect the values of the partitioning expressions for adjacent
+      // rows in order to determine when each partition ends and the next one begins.
+      subquery = Project(
+        projectList = subquery.output ++ extraProjectedPartitioningExpressions,
+        child = subquery)
+      val partitioningAttributes = partitioningExpressionIndexes.map(i => subquery.output(i))
+      subquery = Sort(
+        order = partitioningAttributes.map(e => SortOrder(e, Ascending)) ++ orderByExpressions,
+        global = false,
+        child = RepartitionByExpression(
+          partitionExpressions = partitioningAttributes,
+          optNumPartitions = None,
+          child = subquery))
     }
     if (withSinglePartition) {
       subquery = Repartition(
         numPartitions = 1,
         shuffle = true,
         child = subquery)
-    }
-    if (orderByExpressions.nonEmpty) {
-      subquery = Sort(
-        order = orderByExpressions,
-        global = false,
-        child = subquery)
+      if (orderByExpressions.nonEmpty) {
+        subquery = Sort(
+          order = orderByExpressions,
+          global = false,
+          child = subquery)
+      }
     }
     Project(Seq(Alias(CreateStruct(subquery.output), "c")()), subquery)
   }
+
+  /**
+   * These are the indexes of the PARTITION BY expressions within the concatenation of the child's
+   * output attributes and the [[extraProjectedPartitioningExpressions]]. We send these indexes to
+   * the Python UDTF evaluator so it knows which expressions to compare on adjacent rows to know
+   * when the partition has changed.
+   */
+  lazy val partitioningExpressionIndexes: Seq[Int] = partitionByExpressions.map { e =>
+    subqueryOutputs.get(e).getOrElse {
+      lazy val extraIndexes = extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap

Review Comment:
   nit: I'm wondering this is done only once?
   might want to extract this out of `partitionByExpressions.map { e => ... }` to be readable?
   
   ```scala
   lazy val partitioningExpressionIndexes: Seq[Int] = {
     val extraIndexes = extraProjectedPartitioningExpressions.map(_.child).zipWithIndex.toMap
     partitionByExpressions.map { e =>
       subqueryOutputs.get(e).getOrElse {
         extraIndexes.get(e).get + plan.output.length
       }
     }
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org