You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/07/26 16:43:35 UTC

[GitHub] [spark] dtenedor opened a new pull request, #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

dtenedor opened a new pull request, #42174:
URL: https://github.com/apache/spark/pull/42174

   ### What changes were proposed in this pull request?
   
   This PR adds analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls.
   
   ### Why are the changes needed?
   
   This will provide a way for the TVF caller to indicate desired semantics for dividing up the rows of the input table into partitions for consumption by the underlying algorithm.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   This PR adds unit test coverage and also end-to-end tests in Python.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1279997463


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -1444,6 +1447,83 @@ def terminate(self):
                     assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
                     assertDataFrameEqual(df, [Row(col1=10), Row(col1=100)])
 
+    def test_udtf_call_with_partition_by(self):
+        class TestUDTF:
+            def __init__(self):
+                self._sum = 0
+
+            def eval(self, row: Row):
+                self._sum += row["x"]
+
+            def terminate(self):
+                yield self._sum,
+
+        func = udtf(TestUDTF, returnType="a: int")
+        self.spark.udtf.register("test_udtf_pb", func)
+
+        def actual(query: str) -> str:
+            df = self.spark.sql(query)
+            value = df.collect()[0][0]
+            stripExprIds = re.sub(r'#[\d]+', r'#xx', value)
+            stripPlanIds = re.sub(
+                r'plan_id=[\d]+', r'plan_id=xx', stripExprIds)
+            stripEvalType = re.sub(
+                r'\+- .....EvalPythonUDTF test_udtf_pb.*', r'+- EvalPythonUDTF test_udtf_pb',
+                stripPlanIds)
+            print('Query plan: ' + stripEvalType)
+            return stripEvalType.strip('\n')
+        def expected(input: str) -> str:
+            return textwrap.dedent(input).strip('\n')
+
+        self.assertEqual(

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1652182298

   cc @allisonwang-db @ueshin @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1279998257


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                resolvedFunc match {
+                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case _ if t.hasRepartitioning =>
+                    throw QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(

Review Comment:
   Python UDTFs are the only type of table function that support input relation arguments right now IIRC in Spark. Certain Spark forks may define their own and then this check would apply. (Please let me know if you can think of any way to test this in this PR.)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280624075


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                resolvedFunc match {
+                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case _ if t.hasRepartitioning =>
+                    throw QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(

Review Comment:
   can we test the builtin TVFs like `explode`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1662992296

   Discussed offline, we decided not to backport anymore PRs from this feature to Spark 3.5.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1279997355


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -61,5 +75,36 @@ case class FunctionTableSubqueryArgumentExpression(
   final override def nodePatternsInternal: Seq[TreePattern] =
     Seq(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
 
-  lazy val evaluable: LogicalPlan = Project(Seq(Alias(CreateStruct(plan.output), "c")()), plan)
+  def hasRepartitioning: Boolean = withSinglePartition || partitionByExpressions.nonEmpty
+
+  lazy val evaluable: LogicalPlan = {
+    val subquery = if (hasRepartitioning) {
+      // If the TABLE argument includes the WITH SINGLE PARTITION or PARTITION BY or ORDER BY
+      // clause(s), add a corresponding logical operator to represent the repartitioning operation
+      // in the query plan.
+      RepartitionForTableFunctionCall(

Review Comment:
   Good idea, I updated the code to use this, now we don't need to add a new operator!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,22 @@ import org.apache.spark.sql.types.DataType
  * table in the catalog. In the latter case, the relation argument comprises
  * a table subquery that may itself refer to one or more tables in its own
  * FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If present, these indicate
+ * how to logically split up the input relation such that the table-valued function evaluates
+ * exactly once for each partition, and returns the union of all results. If no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in each partition such
+ * that the table-valued function receives them one-by-one in the requested order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is undefined.

Review Comment:
   Sounds good, done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1660310686

   let's backport to make the feature complete. cc release manager @xuanyuanking 
   
   @dtenedor can you update the PR description? It's outdated as we don't add new query plans at the end.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1276818608


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,22 @@ import org.apache.spark.sql.types.DataType
  * table in the catalog. In the latter case, the relation argument comprises
  * a table subquery that may itself refer to one or more tables in its own
  * FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If present, these indicate
+ * how to logically split up the input relation such that the table-valued function evaluates
+ * exactly once for each partition, and returns the union of all results. If no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in each partition such
+ * that the table-valued function receives them one-by-one in the requested order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is undefined.

Review Comment:
   Maybe we can use the javadoc style here to explain each parameter? `@param`



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -1444,6 +1447,83 @@ def terminate(self):
                     assertSchemaEqual(df.schema, StructType().add("col1", IntegerType()))
                     assertDataFrameEqual(df, [Row(col1=10), Row(col1=100)])
 
+    def test_udtf_call_with_partition_by(self):
+        class TestUDTF:
+            def __init__(self):
+                self._sum = 0
+
+            def eval(self, row: Row):
+                self._sum += row["x"]
+
+            def terminate(self):
+                yield self._sum,
+
+        func = udtf(TestUDTF, returnType="a: int")
+        self.spark.udtf.register("test_udtf_pb", func)
+
+        def actual(query: str) -> str:
+            df = self.spark.sql(query)
+            value = df.collect()[0][0]
+            stripExprIds = re.sub(r'#[\d]+', r'#xx', value)
+            stripPlanIds = re.sub(
+                r'plan_id=[\d]+', r'plan_id=xx', stripExprIds)
+            stripEvalType = re.sub(
+                r'\+- .....EvalPythonUDTF test_udtf_pb.*', r'+- EvalPythonUDTF test_udtf_pb',
+                stripPlanIds)
+            print('Query plan: ' + stripEvalType)
+            return stripEvalType.strip('\n')
+        def expected(input: str) -> str:
+            return textwrap.dedent(input).strip('\n')
+
+        self.assertEqual(

Review Comment:
   This is a great test! But I think it would be better if we add this test to the scala side: `PythonUDTFSuite.scala`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -61,5 +75,36 @@ case class FunctionTableSubqueryArgumentExpression(
   final override def nodePatternsInternal: Seq[TreePattern] =
     Seq(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION)
 
-  lazy val evaluable: LogicalPlan = Project(Seq(Alias(CreateStruct(plan.output), "c")()), plan)
+  def hasRepartitioning: Boolean = withSinglePartition || partitionByExpressions.nonEmpty
+
+  lazy val evaluable: LogicalPlan = {
+    val subquery = if (hasRepartitioning) {
+      // If the TABLE argument includes the WITH SINGLE PARTITION or PARTITION BY or ORDER BY
+      // clause(s), add a corresponding logical operator to represent the repartitioning operation
+      // in the query plan.
+      RepartitionForTableFunctionCall(

Review Comment:
   Just curious, have we considered reusing `Sort` + `RepartitionByExpression`/`Repartition` nodes, instead of having a dedicated logical plan?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                resolvedFunc match {
+                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case _ if t.hasRepartitioning =>
+                    throw QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(

Review Comment:
   Can we add a test for this case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1659381456

   Hi @allisonwang-db  thanks for your review, I have responded to your comments, please take another look 🙏 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280626633


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2445,8 +2452,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
           InSubquery(values, expr.asInstanceOf[ListQuery])
         case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
           resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
-        case a @ FunctionTableSubqueryArgumentExpression(sub, _, exprId) if !sub.resolved =>
-          resolveSubQuery(a, outer)(FunctionTableSubqueryArgumentExpression(_, _, exprId))
+        case a @ FunctionTableSubqueryArgumentExpression(
+            sub, _, exprId, partitionByExpressions, withSinglePartition, orderByExpressions)

Review Comment:
   nit:
   ```
   case a: FunctionTableSubqueryArgumentExpression if !a. plan.resolved =>
     resolveSubQuery(a, outer)((plan, outerAttrs) => a.copy(plan = plan, outerAttrs = outerAttrs))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] dtenedor commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280970732


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2073,6 +2073,13 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                resolvedFunc match {
+                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case _ if t.hasRepartitioning =>
+                    throw QueryCompilationErrors.tableValuedFunctionPartitionByClauseNotSupported(

Review Comment:
   I tried this, but got a different data type mismatch error instead. Since I'm unable to exercise this error message, I changed it to an assert for now instead, and left the unit test that exercises using a `TABLE` argument type with EXPLODE.



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -14,9 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import os
+import re
 import shutil
 import tempfile
+import textwrap

Review Comment:
   Apologies for missing this, reverted this file now.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2445,8 +2452,12 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
           InSubquery(values, expr.asInstanceOf[ListQuery])
         case s @ LateralSubquery(sub, _, exprId, _, _) if !sub.resolved =>
           resolveSubQuery(s, outer)(LateralSubquery(_, _, exprId))
-        case a @ FunctionTableSubqueryArgumentExpression(sub, _, exprId) if !sub.resolved =>
-          resolveSubQuery(a, outer)(FunctionTableSubqueryArgumentExpression(_, _, exprId))
+        case a @ FunctionTableSubqueryArgumentExpression(
+            sub, _, exprId, partitionByExpressions, withSinglePartition, orderByExpressions)

Review Comment:
   Done, thanks, this is better.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,40 @@ import org.apache.spark.sql.types.DataType
  * table in the catalog. In the latter case, the relation argument comprises
  * a table subquery that may itself refer to one or more tables in its own
  * FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If present, these indicate
+ * how to logically split up the input relation such that the table-valued function evaluates
+ * exactly once for each partition, and returns the union of all results. If no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in each partition such
+ * that the table-valued function receives them one-by-one in the requested order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is undefined.
+ *
+ * @param plan the logical plan provided as input for the table argument as either a logical
+ *             relation or as a more complex logical plan in the event of a table subquery.
+ * @param outerAttrs outer references of this subquery plan, generally empty since these table
+ *                   arguments do not allow correlated references currently
+ * @param exprId expression ID of this subquery expression, generally generated afresh each time
+ * @param partitionByExpressions if non-empty, the TABLE argument included the PARTITION BY clause
+ *                               to indicate that the input relation should be repartitioned by the
+ *                               hash of the provided expressions, such that all the rows with each
+ *                               unique combination of values of the partitioning expressions will
+ *                               be consumed by exactly one instance of the table function class.
+ * @param withSinglePartition if true, the TABLE argument included the WITH SINGLE PARTITION clause
+ *                            to indicate that the entire input relation should be repartitioned to
+ *                            one worker for consumption by exactly one instance of the table
+ *                            function class.
+ * @param orderByExpressions if non-empty, the TABLE argument included the ORDER BY clause to
+ *                           indicate that the rows within each partition of the table function are
+ *                           to arrive in the provided order.
  */
 case class FunctionTableSubqueryArgumentExpression(
     plan: LogicalPlan,
     outerAttrs: Seq[Expression] = Seq.empty,
-    exprId: ExprId = NamedExpression.newExprId)
+    exprId: ExprId = NamedExpression.newExprId,
+    partitionByExpressions: Seq[Expression] = Seq.empty,
+    withSinglePartition: Boolean = false,
+    orderByExpressions: Seq[SortOrder] = Seq.empty)
   extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with Unevaluable {
 

Review Comment:
   Good idea, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1564,14 +1564,43 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }.getOrElse {
       plan(ctx.query)
     }
-    val partitioning = Option(ctx.tableArgumentPartitioning)
-    if (partitioning.isDefined) {
-      // The PARTITION BY clause is not implemented yet for TABLE arguments to table valued function
-      // calls.
-      operationNotAllowed(
-        "Specifying the PARTITION BY clause for TABLE arguments is not implemented yet", ctx)
-    }
-    FunctionTableSubqueryArgumentExpression(p)
+    var withSinglePartition = false
+    var partitionByExpressions = Seq.empty[Expression]
+    var orderByExpressions = Seq.empty[SortOrder]
+    Option(ctx.tableArgumentPartitioning)
+      .foreach { p =>
+        if (p.SINGLE != null) {
+          withSinglePartition = true
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)
+      .map(_.partition.asScala.map(expression))
+      .foreach { expressions =>
+        if (expressions.nonEmpty) {
+          partitionByExpressions = expressions
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)

Review Comment:
   Thanks, this is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] allisonwang-db commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "allisonwang-db (via GitHub)" <gi...@apache.org>.
allisonwang-db commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1659467625

   @dtenedor since the TABLE argument support is already in Spark 3.5, should we add this to Spark 3.5 as well? also cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280631670


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/FunctionTableSubqueryArgumentExpression.scala:
##########
@@ -32,11 +32,40 @@ import org.apache.spark.sql.types.DataType
  * table in the catalog. In the latter case, the relation argument comprises
  * a table subquery that may itself refer to one or more tables in its own
  * FROM clause.
+ *
+ * Each TABLE argument may also optionally include a PARTITION BY clause. If present, these indicate
+ * how to logically split up the input relation such that the table-valued function evaluates
+ * exactly once for each partition, and returns the union of all results. If no partitioning list is
+ * present, this splitting of the input relation is undefined. Furthermore, if the PARTITION BY
+ * clause includes a following ORDER BY clause, Catalyst will sort the rows in each partition such
+ * that the table-valued function receives them one-by-one in the requested order. Otherwise, if no
+ * such ordering is specified, the ordering of rows within each partition is undefined.
+ *
+ * @param plan the logical plan provided as input for the table argument as either a logical
+ *             relation or as a more complex logical plan in the event of a table subquery.
+ * @param outerAttrs outer references of this subquery plan, generally empty since these table
+ *                   arguments do not allow correlated references currently
+ * @param exprId expression ID of this subquery expression, generally generated afresh each time
+ * @param partitionByExpressions if non-empty, the TABLE argument included the PARTITION BY clause
+ *                               to indicate that the input relation should be repartitioned by the
+ *                               hash of the provided expressions, such that all the rows with each
+ *                               unique combination of values of the partitioning expressions will
+ *                               be consumed by exactly one instance of the table function class.
+ * @param withSinglePartition if true, the TABLE argument included the WITH SINGLE PARTITION clause
+ *                            to indicate that the entire input relation should be repartitioned to
+ *                            one worker for consumption by exactly one instance of the table
+ *                            function class.
+ * @param orderByExpressions if non-empty, the TABLE argument included the ORDER BY clause to
+ *                           indicate that the rows within each partition of the table function are
+ *                           to arrive in the provided order.
  */
 case class FunctionTableSubqueryArgumentExpression(
     plan: LogicalPlan,
     outerAttrs: Seq[Expression] = Seq.empty,
-    exprId: ExprId = NamedExpression.newExprId)
+    exprId: ExprId = NamedExpression.newExprId,
+    partitionByExpressions: Seq[Expression] = Seq.empty,
+    withSinglePartition: Boolean = false,
+    orderByExpressions: Seq[SortOrder] = Seq.empty)
   extends SubqueryExpression(plan, outerAttrs, exprId, Seq.empty, None) with Unevaluable {
 

Review Comment:
   shall we add an assert here that `partitionByExpressions` must be Nil if `withSinglePartition` is true?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280636015


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala:
##########
@@ -1564,14 +1564,43 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
     }.getOrElse {
       plan(ctx.query)
     }
-    val partitioning = Option(ctx.tableArgumentPartitioning)
-    if (partitioning.isDefined) {
-      // The PARTITION BY clause is not implemented yet for TABLE arguments to table valued function
-      // calls.
-      operationNotAllowed(
-        "Specifying the PARTITION BY clause for TABLE arguments is not implemented yet", ctx)
-    }
-    FunctionTableSubqueryArgumentExpression(p)
+    var withSinglePartition = false
+    var partitionByExpressions = Seq.empty[Expression]
+    var orderByExpressions = Seq.empty[SortOrder]
+    Option(ctx.tableArgumentPartitioning)
+      .foreach { p =>
+        if (p.SINGLE != null) {
+          withSinglePartition = true
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)
+      .map(_.partition.asScala.map(expression))
+      .foreach { expressions =>
+        if (expressions.nonEmpty) {
+          partitionByExpressions = expressions
+        }
+      }
+    Option(ctx.tableArgumentPartitioning)

Review Comment:
   nit:
   ```
   Option(ctx.tableArgumentPartitioning).foreah { p =>
     if (p.SINGLE != null) {
       withSinglePartition = true
     }
     partitionByExpressions = p.partition.asScala.map(expression)
     orderByExpressions = p.sortItem.asScala.map(visitSortItem)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ueshin commented on pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #42174:
URL: https://github.com/apache/spark/pull/42174#issuecomment-1662694638

   @dtenedor @allisonwang-db @cloud-fan 
   Oh, wait. the previous PR about the grammar for PARTITION BY and ORDER BY clause was merged only in `master`.
   If we need to support this in 3.5, could you also submit a PR to backport it, too?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #42174: [SPARK-44503][SQL] Add analysis and planning for PARTITION BY and ORDER BY clause after TABLE arguments for TVF calls

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #42174:
URL: https://github.com/apache/spark/pull/42174#discussion_r1280622930


##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -14,9 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+
 import os
+import re
 import shutil
 import tempfile
+import textwrap

Review Comment:
   unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org