You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2023/11/22 00:32:00 UTC

[PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

dtenedor opened a new pull request, #43946:
URL: https://github.com/apache/spark/pull/43946

   ### What changes were proposed in this pull request?
   
   This PR updates the Python user-defined table function (UDTF) API for the `analyze` method to support general expressions for the `partitionBy` and `orderBy` fields of the `AnalyzeResult` class.
   
   For example, the following UDTF specifies to partition by `partition_col / 10` so that all rows with values of this column between 0-9 arrive in the same partition, then all rows with values between 10-19 in the next partition, and so on.
   
   ```
   @udtf
   class TestUDTF:
       def __init__(self):
   	self._partition_col = None
   	self._count = 0
   	self._sum = 0
   	self._last = None
   
       @staticmethod
       def analyze(*args, **kwargs):
   	return AnalyzeResult(
   	    schema=StructType()
   	    .add("partition_col", IntegerType())
   	    .add("count", IntegerType())
   	    .add("total", IntegerType())
   	    .add("last", IntegerType()),
   	    partitionBy=[PartitioningExpression("partition_col / 10")],
   	    orderBy=[
   		OrderingExpression(value="input", ascending=True, overrideNullsFirst=False)
   	    ],
   	)
   
       def eval(self, row: Row):
   	self._partition_col = row["partition_col"]
   	self._count += 1
   	self._last = row["input"]
   	if row["input"] is not None:
   	    self._sum += row["input"]
   
       def terminate(self):
   	yield self._partition_col, self._count, self._sum, self._last
   ```
   
   ### Why are the changes needed?
   
   This lets the UDTF partition by simple references to the columns of the input table just like before, but also gives the option to partition by general expressions based on those columns (just like the explicit `PARTITION BY` and `ORDER BY` clauses in the UDTF call in SQL).
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR includes test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1409698731


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -106,7 +106,7 @@ case class UserDefinedPythonTableFunction(
     this(name, func, None, pythonEvalType, udfDeterministic)
   }
 
-  def builder(exprs: Seq[Expression]): LogicalPlan = {
+  def builder(exprs: Seq[Expression], parser: ParserInterface): LogicalPlan = {

Review Comment:
   This should be `parser: => ParserInterface` to be a by-name argument. https://github.com/apache/spark/pull/43946#discussion_r1408346486



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1402813208


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -237,33 +239,32 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
     // Receive whether the "with single partition" property is requested.
     val withSinglePartition = dataIn.readInt() == 1
-    // Receive the list of requested partitioning columns, if any.
-    val partitionByColumns = ArrayBuffer.empty[Expression]
-    val numPartitionByColumns = dataIn.readInt()
-    for (_ <- 0 until numPartitionByColumns) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
-      partitionByColumns.append(UnresolvedAttribute(columnName))
+    // Receive the list of requested partitioning expressions, if any.
+    val partitionByExpressions = ArrayBuffer.empty[Expression]
+    val numPartitionByExpressions = dataIn.readInt()
+    for (_ <- 0 until numPartitionByExpressions) {
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      partitionByExpressions.append(parsed)
     }
-    // Receive the list of requested ordering columns, if any.
+    // Receive the list of requested ordering expressions, if any.
     val orderBy = ArrayBuffer.empty[SortOrder]
     val numOrderByItems = dataIn.readInt()
     for (_ <- 0 until numOrderByItems) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
       val direction = if (dataIn.readInt() == 1) Ascending else Descending
       val overrideNullsFirst = dataIn.readInt()
       overrideNullsFirst match {
-        case 0 =>
-          orderBy.append(SortOrder(UnresolvedAttribute(columnName), direction))
-        case 1 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsFirst, Seq.empty))
-        case 2 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsLast, Seq.empty))
+        case 0 => orderBy.append(SortOrder(parsed, direction))
+        case 1 => orderBy.append(SortOrder(parsed, direction, NullsFirst, Seq.empty))
+        case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, Seq.empty))
       }

Review Comment:
   Oh, does it fail? I thought it's a valid expression. NVM, then.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1408346486


##########
sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala:
##########
@@ -44,6 +45,7 @@ class UDTFRegistration private[sql] (tableFunctionRegistry: TableFunctionRegistr
          | udfDeterministic: ${udtf.udfDeterministic}
       """.stripMargin)
 
-    tableFunctionRegistry.createOrReplaceTempFunction(name, udtf.builder, "python_udtf")
+    tableFunctionRegistry.createOrReplaceTempFunction(
+      name, udtf.builder(_, new CatalystSqlParser()), "python_udtf")

Review Comment:
   We shouldn't use `CatalystSqlParser`.
   How about making the argument `parser` for `builder` a by-name parameter and use the parser from active session here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1412551814


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -621,10 +623,30 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |""".stripMargin
   }
 
+  object UDTFPartitionByOrderBy
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input")
+
+  object UDTFPartitionByOrderByComplexExpr
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col + 1",
+      orderBy = "RANDOM(42)")
+
+  object UDTFInvalidPartitionByOrderByParseError
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "unparsable",
+      orderBy = "input")
+
+  object UDTFInvalidOrderByAscKeyword
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "partition_col ASC")

Review Comment:
   Apologies about that, it seems it was a bad merge from the last PR. I fixed this now, and the tests are used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1412503785


##########
sql/core/src/test/scala/org/apache/spark/sql/IntegratedUDFTestUtils.scala:
##########
@@ -621,10 +623,30 @@ object IntegratedUDFTestUtils extends SQLHelper {
         |""".stripMargin
   }
 
+  object UDTFPartitionByOrderBy
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "input")
+
+  object UDTFPartitionByOrderByComplexExpr
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col + 1",
+      orderBy = "RANDOM(42)")
+
+  object UDTFInvalidPartitionByOrderByParseError
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "unparsable",
+      orderBy = "input")
+
+  object UDTFInvalidOrderByAscKeyword
+    extends TestPythonUDTFPartitionByOrderByBase(
+      partitionBy = "partition_col",
+      orderBy = "partition_col ASC")

Review Comment:
   Where are these new tests called?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1414354395


##########
sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out:
##########
@@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0	1	1	1
+1	2	5	3
+
+
+-- !query
+SELECT * FROM InvalidTestPythonUDTFOrderByAscKeyword(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVABLE_TABLE_VALUED_FUNCTION",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "name" : "`InvalidTestPythonUDTFOrderByAscKeyword`"

Review Comment:
   Ah, then, the updated test seems to be duplicated now:
   
   ```sql
   SELECT * FROM UDTFInvalidOrderByAscKeyword(TABLE(t2));
   ```
   
   and the latter one failed with `ParseException` by missing `;` at the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #43946:
URL: https://github.com/apache/spark/pull/43946#issuecomment-1821907222

   cc @ueshin @allisonwang-db 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1402787837


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -190,6 +190,8 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
   override val workerModule = "pyspark.sql.worker.analyze_udtf"
 
+  private lazy val parser = new CatalystSqlParser()

Review Comment:
   This is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1402696088


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -190,6 +190,8 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
   override val workerModule = "pyspark.sql.worker.analyze_udtf"
 
+  private lazy val parser = new CatalystSqlParser()

Review Comment:
   We should use a parser in `SparkSession`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -237,33 +239,32 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
     // Receive whether the "with single partition" property is requested.
     val withSinglePartition = dataIn.readInt() == 1
-    // Receive the list of requested partitioning columns, if any.
-    val partitionByColumns = ArrayBuffer.empty[Expression]
-    val numPartitionByColumns = dataIn.readInt()
-    for (_ <- 0 until numPartitionByColumns) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
-      partitionByColumns.append(UnresolvedAttribute(columnName))
+    // Receive the list of requested partitioning expressions, if any.
+    val partitionByExpressions = ArrayBuffer.empty[Expression]
+    val numPartitionByExpressions = dataIn.readInt()
+    for (_ <- 0 until numPartitionByExpressions) {
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      partitionByExpressions.append(parsed)
     }
-    // Receive the list of requested ordering columns, if any.
+    // Receive the list of requested ordering expressions, if any.
     val orderBy = ArrayBuffer.empty[SortOrder]
     val numOrderByItems = dataIn.readInt()
     for (_ <- 0 until numOrderByItems) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
       val direction = if (dataIn.readInt() == 1) Ascending else Descending
       val overrideNullsFirst = dataIn.readInt()
       overrideNullsFirst match {
-        case 0 =>
-          orderBy.append(SortOrder(UnresolvedAttribute(columnName), direction))
-        case 1 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsFirst, Seq.empty))
-        case 2 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsLast, Seq.empty))
+        case 0 => orderBy.append(SortOrder(parsed, direction))
+        case 1 => orderBy.append(SortOrder(parsed, direction, NullsFirst, Seq.empty))
+        case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, Seq.empty))
       }

Review Comment:
   What happens if the expression contains something like `order_column DESC`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1414600570


##########
sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out:
##########
@@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0	1	1	1
+1	2	5	3
+
+
+-- !query
+SELECT * FROM InvalidTestPythonUDTFOrderByAscKeyword(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVABLE_TABLE_VALUED_FUNCTION",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "name" : "`InvalidTestPythonUDTFOrderByAscKeyword`"

Review Comment:
   Thanks for pointing that out, fixed it again!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin closed pull request #43946: [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions
URL: https://github.com/apache/spark/pull/43946


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1414329642


##########
sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out:
##########
@@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0	1	1	1
+1	2	5	3
+
+
+-- !query
+SELECT * FROM InvalidTestPythonUDTFOrderByAscKeyword(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVABLE_TABLE_VALUED_FUNCTION",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "name" : "`InvalidTestPythonUDTFOrderByAscKeyword`"

Review Comment:
   Apologies, I forgot to update the UDTF name. Fixed this now, the error message works as expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #43946:
URL: https://github.com/apache/spark/pull/43946#issuecomment-1839813619

   Thanks! merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1410891346


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -106,7 +106,7 @@ case class UserDefinedPythonTableFunction(
     this(name, func, None, pythonEvalType, udfDeterministic)
   }
 
-  def builder(exprs: Seq[Expression]): LogicalPlan = {
+  def builder(exprs: Seq[Expression], parser: ParserInterface): LogicalPlan = {

Review Comment:
   This is done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1412596499


##########
sql/core/src/test/resources/sql-tests/results/udtf/udtf.sql.out:
##########
@@ -335,6 +335,37 @@ org.apache.spark.sql.AnalysisException
 }
 
 
+-- !query
+SELECT * FROM UDTFPartitionByOrderByComplexExpr(TABLE(t2))
+-- !query schema
+struct<partition_col:int,count:int,total:int,last:int>
+-- !query output
+0	1	1	1
+1	2	5	3
+
+
+-- !query
+SELECT * FROM InvalidTestPythonUDTFOrderByAscKeyword(TABLE(t2))
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.AnalysisException
+{
+  "errorClass" : "UNRESOLVABLE_TABLE_VALUED_FUNCTION",
+  "sqlState" : "42883",
+  "messageParameters" : {
+    "name" : "`InvalidTestPythonUDTFOrderByAscKeyword`"

Review Comment:
   Is this expected? From the class name, it seems not expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1402787651


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/python/UserDefinedPythonFunction.scala:
##########
@@ -237,33 +239,32 @@ class UserDefinedPythonTableFunctionAnalyzeRunner(
 
     // Receive whether the "with single partition" property is requested.
     val withSinglePartition = dataIn.readInt() == 1
-    // Receive the list of requested partitioning columns, if any.
-    val partitionByColumns = ArrayBuffer.empty[Expression]
-    val numPartitionByColumns = dataIn.readInt()
-    for (_ <- 0 until numPartitionByColumns) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
-      partitionByColumns.append(UnresolvedAttribute(columnName))
+    // Receive the list of requested partitioning expressions, if any.
+    val partitionByExpressions = ArrayBuffer.empty[Expression]
+    val numPartitionByExpressions = dataIn.readInt()
+    for (_ <- 0 until numPartitionByExpressions) {
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
+      partitionByExpressions.append(parsed)
     }
-    // Receive the list of requested ordering columns, if any.
+    // Receive the list of requested ordering expressions, if any.
     val orderBy = ArrayBuffer.empty[SortOrder]
     val numOrderByItems = dataIn.readInt()
     for (_ <- 0 until numOrderByItems) {
-      val columnName = PythonWorkerUtils.readUTF(dataIn)
+      val expressionSql: String = PythonWorkerUtils.readUTF(dataIn)
+      val parsed: Expression = parser.parseExpression(expressionSql)
       val direction = if (dataIn.readInt() == 1) Ascending else Descending
       val overrideNullsFirst = dataIn.readInt()
       overrideNullsFirst match {
-        case 0 =>
-          orderBy.append(SortOrder(UnresolvedAttribute(columnName), direction))
-        case 1 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsFirst, Seq.empty))
-        case 2 => orderBy.append(
-          SortOrder(UnresolvedAttribute(columnName), direction, NullsLast, Seq.empty))
+        case 0 => orderBy.append(SortOrder(parsed, direction))
+        case 1 => orderBy.append(SortOrder(parsed, direction, NullsFirst, Seq.empty))
+        case 2 => orderBy.append(SortOrder(parsed, direction, NullsLast, Seq.empty))
       }

Review Comment:
   Anything that fails to parse should fail the query. I added a test case to cover this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46040][SQL][Python] Update UDTF API for 'analyze' partitioning/ordering columns to support general expressions [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #43946:
URL: https://github.com/apache/spark/pull/43946#discussion_r1409512037


##########
sql/core/src/main/scala/org/apache/spark/sql/UDTFRegistration.scala:
##########
@@ -44,6 +45,7 @@ class UDTFRegistration private[sql] (tableFunctionRegistry: TableFunctionRegistr
          | udfDeterministic: ${udtf.udfDeterministic}
       """.stripMargin)
 
-    tableFunctionRegistry.createOrReplaceTempFunction(name, udtf.builder, "python_udtf")
+    tableFunctionRegistry.createOrReplaceTempFunction(
+      name, udtf.builder(_, new CatalystSqlParser()), "python_udtf")

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org