You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "allisonwang-db (via GitHub)" <gi...@apache.org> on 2023/08/30 18:44:17 UTC

[GitHub] [spark] allisonwang-db commented on a diff in pull request #42595: [SPARK-44901][SQL] Add API in Python UDTF 'analyze' method to return partitioning/ordering expressions

allisonwang-db commented on code in PR #42595:
URL: https://github.com/apache/spark/pull/42595#discussion_r1310679860


##########
python/pyspark/sql/udtf.py:
##########
@@ -61,6 +61,26 @@ class AnalyzeArgument:
     is_table: bool
 
 
+@dataclass(frozen=True)
+class PartitioningColumn:
+    """
+    Represents a UDTF column for purposes of returning metadata from the 'analyze' method.
+    """
+
+    name: str
+
+
+@dataclass(frozen=True)
+class OrderingColumn:

Review Comment:
   SortOrder class also includes `nullOrdering` . Do we want to add is here? For example `ORDER BY t.c1 DESC NULL FIRST`



##########
python/pyspark/sql/tests/test_udtf.py:
##########
@@ -2131,6 +2133,118 @@ def terminate(self):
             [Row(count=40, total=60, last=2)],
         )
 
+    def test_udtf_with_table_argument_with_single_partition_from_analyze(self):
+        @udtf
+        class TestUDTF:
+            def __init__(self):
+                self._count = 0
+                self._sum = 0
+                self._last = None
+
+            @staticmethod
+            def analyze(self):
+                return AnalyzeResult(
+                    schema=StructType()
+                    .add("count", IntegerType())
+                    .add("total", IntegerType())
+                    .add("last", IntegerType()),
+                    with_single_partition=True,
+                    order_by=[OrderingColumn("input"), OrderingColumn("partition_col")],
+                )
+
+            def eval(self, row: Row):
+                # Make sure that the rows arrive in the expected order.
+                if self._last is not None and self._last > row["input"]:
+                    raise Exception(
+                        f"self._last was {self._last} but the row value was {row['input']}"
+                    )
+                self._count += 1
+                self._last = row["input"]
+                self._sum += row["input"]
+
+            def terminate(self):
+                yield self._count, self._sum, self._last
+
+        self.spark.udtf.register("test_udtf", TestUDTF)
+
+        self.assertEqual(

Review Comment:
   Let's use the new PySpark test framework `assertDataFrameEqual` added by @asl3 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because it included the " +
                         "PARTITION BY clause, but only Python table functions support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table function whose polymorphic
+                // 'analyze' method returned metadata indicated requested partitioning and/or
+                // ordering properties of the input relation. In that event, make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>

Review Comment:
   nit: `a` -> `analyzeResult`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because it included the " +
                         "PARTITION BY clause, but only Python table functions support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table function whose polymorphic
+                // 'analyze' method returned metadata indicated requested partitioning and/or
+                // ordering properties of the input relation. In that event, make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>
+                  if (a.withSinglePartition && a.partitionByExpressions.nonEmpty) {
+                    throw QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'with_single_partition' field cannot be assigned to true " +
+                        "if the 'partition_by' list is non-empty")
+                  } else if (a.orderByExpressions.nonEmpty && !a.withSinglePartition &&
+                    a.partitionByExpressions.isEmpty) {
+                    throw QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'order_by' field cannot be non-empty unless the " +
+                        "'with_single_partition' field is set to true or the 'partition_by' list " +
+                        "is non-empty")
+                  } else if (a.hasRepartitioning && t.hasRepartitioning) {
+                    throw QueryCompilationErrors
+                      .tableValuedFunctionRequiredMetadataIncompatibleWithCall(
+                        functionName = pythonUDTF.get.name,
+                        requestedMetadata =
+                          "specified its own required partitioning of the input table",
+                        invalidFunctionCallProperty =
+                          "specified the WITH SINGLE PARTITION or PARTITION BY clause; " +
+                            "please remove these clauses and retry the query again.")
+                  }
+                  var withSinglePartition = t.withSinglePartition
+                  var partitionByExpressions = t.partitionByExpressions
+                  var orderByExpressions = t.orderByExpressions
+                  if (a.withSinglePartition) {
+                    withSinglePartition = true
+                  }
+                  if (a.partitionByExpressions.nonEmpty) {
+                    partitionByExpressions = a.partitionByExpressions
+                  }
+                  if (a.orderByExpressions.nonEmpty) {
+                    orderByExpressions = a.orderByExpressions
+                  }

Review Comment:
   One option to make documentation easier (and make it easier for users to understand), is to simply throw an exception if both the table subquery and the UDTF's analysis result contain any of the metadata.
   For example:
   UDTFAnalyzeResult: [single_partition = True] is invoked with `udtf(TABLE(t) ORDER BY t.c1) `
   
   We can throw an exception stating that the UDTF already has 'PARTITION BY' or 'ORDER BY' defined in its analysis result, and thus it cannot be used in conjunction with the SQL 'PARTITION BY' or 'ORDER BY' syntax. 
   
   Of course, this approach limits the expressiveness of UDTFs. Another option is to retain the overwriting logic here, but we would need to clearly document the behavior when a UDTF has both an 'ORDER BY' clause in the SQL query and within the UDTF's AnalyzeResult.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because it included the " +
                         "PARTITION BY clause, but only Python table functions support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table function whose polymorphic
+                // 'analyze' method returned metadata indicated requested partitioning and/or
+                // ordering properties of the input relation. In that event, make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>
+                  if (a.withSinglePartition && a.partitionByExpressions.nonEmpty) {
+                    throw QueryCompilationErrors.tableValuedFunctionRequiredMetadataInvalid(
+                      functionName = pythonUDTF.get.name,
+                      reason = "the 'with_single_partition' field cannot be assigned to true " +
+                        "if the 'partition_by' list is non-empty")
+                  } else if (a.orderByExpressions.nonEmpty && !a.withSinglePartition &&

Review Comment:
   nit: `else if` -> `if`



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult
                   case _ =>
                     assert(!t.hasRepartitioning,
                       "Cannot evaluate the table-valued function call because it included the " +
                         "PARTITION BY clause, but only Python table functions support this clause")
                 }
-                tableArgs.append(SubqueryAlias(alias, t.evaluable))
-                functionTableSubqueryArgs.append(t)
+                // Check if this is a call to a Python user-defined table function whose polymorphic
+                // 'analyze' method returned metadata indicated requested partitioning and/or
+                // ordering properties of the input relation. In that event, make sure that the UDTF
+                // call did not include any explicit PARTITION BY and/or ORDER BY clauses for the
+                // corresponding TABLE argument, and then update the TABLE argument representation
+                // to apply the requested partitioning and/or ordering.
+                pythonUDTFAnalyzeResult.map { a =>

Review Comment:
   How about creating a helper function `resolvePolymorphicPythonUDTF` here? Otherwise, the logic here to resolve a TVF becomes a bit hard to read.
   if (pythonUDTFAnalyzeResult.isDefined) { resolvePolymorphicPythonUDTF(...) }



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -81,7 +84,7 @@ case class UserDefinedPythonTableFunction(
     func: PythonFunction,
     returnType: Option[StructType],
     pythonEvalType: Int,
-    udfDeterministic: Boolean) {
+    udfDeterministic: Boolean) extends Logging {

Review Comment:
   this change is not needed?



##########
sql/core/src/test/resources/sql-tests/inputs/udtf/udtf.sql:
##########
@@ -16,3 +19,27 @@ SELECT * FROM udtf(1, 2) t(c1, c2), LATERAL udtf(c1, c2);
 
 -- test non-deterministic input
 SELECT * FROM udtf(cast(rand(0) AS int) + 1, 1);
+
+-- test UDTF calls that take input TABLE arguments
+SELECT * FROM UDTFCountSumLast(TABLE(t2) WITH SINGLE PARTITION);

Review Comment:
   It would be really great if we could add some comments for these UDTFs (their analyze result)



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -2096,23 +2096,87 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
                   catalog, "table-valued functions")
               }
             }
-
+            // Resolve Python UDTF calls if needed.
+            val resolvedFunc = resolvedTvf match {
+              case g @ Generate(u: UnresolvedPolymorphicPythonUDTF, _, _, _, _, _) =>
+                val analyzeResult: PythonUDTFAnalyzeResult =
+                  u.resolveElementMetadata(u.func, u.children)
+                g.copy(generator =
+                  PythonUDTF(u.name, u.func, analyzeResult.schema, u.children,
+                    u.evalType, u.udfDeterministic, u.resultId, u.pythonUDTFPartitionColumnIndexes,
+                    analyzeResult = Some(analyzeResult)))
+              case other =>
+                other
+            }
             val tableArgs = mutable.ArrayBuffer.empty[LogicalPlan]
             val functionTableSubqueryArgs =
               mutable.ArrayBuffer.empty[FunctionTableSubqueryArgumentExpression]
             val tvf = resolvedFunc.transformAllExpressionsWithPruning(
               _.containsPattern(FUNCTION_TABLE_RELATION_ARGUMENT_EXPRESSION), ruleId)  {
               case t: FunctionTableSubqueryArgumentExpression =>
                 val alias = SubqueryAlias.generateSubqueryName(s"_${tableArgs.size}")
+                var pythonUDTF = Option.empty[PythonUDTF]
+                var pythonUDTFAnalyzeResult = Option.empty[PythonUDTFAnalyzeResult]
                 resolvedFunc match {
-                  case Generate(_: PythonUDTF, _, _, _, _, _) =>
+                  case Generate(p: PythonUDTF, _, _, _, _, _) =>
+                    pythonUDTF = Some(p)
+                    pythonUDTFAnalyzeResult = p.analyzeResult

Review Comment:
   We can do
   ```
   val (pythonUDTF, pythonUDTFAnalyzeResult) = resolvedFunc match {
     case Generate...
       (Some(p), p.analyzeReulst)
     case _ =>
       assert(...)
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org