You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "dtenedor (via GitHub)" <gi...@apache.org> on 2024/02/16 19:03:05 UTC

[PR] [WIP][SPARK-47032][Python] Prototype for adding pass-through columns to Python UDTF API [spark]

dtenedor opened a new pull request, #45142:
URL: https://github.com/apache/spark/pull/45142

   ### What changes were proposed in this pull request?
   
   [WIP] This is a prototype for adding pass-through columns to Python UDTF API. We'll develop it more before sending out for formal review.
   
   ### Why are the changes needed?
   
   We can use this API to forward column values from the UDTF input table to the output table directly, bypassing JVM/Python interchange and improving performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, see above.
   
   ### How was this patch tested?
   
   This PR adds test coverage.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   NO.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #45142:
URL: https://github.com/apache/spark/pull/45142#issuecomment-1971717342

   We talked offline and the benefit from this is marginal without actually propagating the column values automatically to the output table. Having prototyped that and found no benefit in latency/CPU/memory, that's a non-starter. Therefore it seems this feature is not worth the additional complexity and I am closing this PR for now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "ueshin (via GitHub)" <gi...@apache.org>.
ueshin commented on PR #45142:
URL: https://github.com/apache/spark/pull/45142#issuecomment-1970085184

   With the UDTF given in the description:
   
   ```py
   >>> spark.sql("SELECT * FROM Identity(TABLE(select 'x' as a, 1 as b))").show()
   +---+----+
   |  a|   b|
   +---+----+
   |  x|1001|
   +---+----+
   ```
   
   whereas:
   
   ```py
   >>> spark.sql("SELECT * FROM Identity(TABLE(select 'x' as a, 1 as b)) WHERE a = 'x'").show()
   Traceback (most recent call last):
   ...
   py4j.protocol.Py4JJavaError: An error occurred while calling o96.showString.
   : java.lang.IndexOutOfBoundsException: 0
   	at scala.collection.LinearSeqOps.apply(LinearSeq.scala:131)
   	at scala.collection.LinearSeqOps.apply$(LinearSeq.scala:128)
   	at scala.collection.immutable.List.apply(List.scala:79)
   	at org.apache.spark.sql.catalyst.plans.logical.BaseEvalPythonUDTF.$anonfun$forwardedColumnMap$2(pythonLogicalOperators.scala:245)
   	at scala.collection.immutable.List.foreach(List.scala:333)
   	at org.apache.spark.sql.catalyst.plans.logical.BaseEvalPythonUDTF.$anonfun$forwardedColumnMap$1(pythonLogicalOperators.scala:234)
   	at org.apache.spark.sql.catalyst.plans.logical.BaseEvalPythonUDTF.$anonfun$forwardedColumnMap$1$adapted(pythonLogicalOperators.scala:229)
   	at scala.Option.foreach(Option.scala:437)
   	at org.apache.spark.sql.catalyst.plans.logical.BaseEvalPythonUDTF.forwardedColumnMap(pythonLogicalOperators.scala:229)
   	at org.apache.spark.sql.catalyst.plans.logical.BaseEvalPythonUDTF.forwardedColumnMap$(pythonLogicalOperators.scala:227)
   	at org.apache.spark.sql.catalyst.plans.logical.BatchEvalPythonUDTF.forwardedColumnMap$lzycompute(pythonLogicalOperators.scala:300)
   	at org.apache.spark.sql.catalyst.plans.logical.BatchEvalPythonUDTF.forwardedColumnMap(pythonLogicalOperators.scala:300)
   	at org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$6.$anonfun$applyOrElse$49(Optimizer.scala:1825)
   	at org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$6.$anonfun$applyOrElse$49$adapted(Optimizer.scala:1823)
   	at scala.collection.StrictOptimizedIterableOps.$anonfun$partition$1(StrictOptimizedIterableOps.scala:34)
   	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:576)
   	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:574)
   	at scala.collection.AbstractIterator.foreach(Iterator.scala:1300)
   	at scala.collection.immutable.List.partition(List.scala:588)
   	at org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$6.applyOrElse(Optimizer.scala:1823)
   	at org.apache.spark.sql.catalyst.optimizer.PushPredicateThroughNonJoin$$anonfun$6.applyOrElse(Optimizer.scala:1715)
   ...
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #45142:
URL: https://github.com/apache/spark/pull/45142#discussion_r1506861839


##########
python/pyspark/sql/udtf.py:
##########
@@ -123,10 +123,17 @@ class SelectedColumn:
     alias : str, default ''
         If non-empty, this is the alias for the column or expression as visible from the UDTF's
         'eval' method. This is required if the expression is not a simple column reference.
+    forwardToOutputTable : bool, default False
+        If true, the UDTF is specifying to Catalyst a metadata property wherein the function call
+        will copy the result of evaluating this column or expression from the most recent input row
+        through to the output table, to a column with the same name specified in the 'alias' field
+        (or the name of the simple column reference otherwise). This is useful because it lets the
+        optimizer push filters or other operations down through the UDTF call to the input table.
     """
 
     name: str
     alias: str = ""
+    forwardToOutputTable: bool = False

Review Comment:
   @dtenedor would it be possible to create a second abstraction rather than an optional field in the select column? For the AI_FORECAST group columns, I'd like to be able to select only the value columns, but pass-through the group columns e.g.
   
   ```
   AnalyzeResult(
       ...,
       select=["v1", "v2"],
       forward_to_output=["dim1", "dim2"]
   )
   ```
   
   this helps materially for AI_FORECAST because the value columns are a known statically-sized types (a double-castable number) but the dimension columns can be variably sized (e.g. strings)



##########
python/pyspark/sql/udtf.py:
##########
@@ -123,10 +123,17 @@ class SelectedColumn:
     alias : str, default ''
         If non-empty, this is the alias for the column or expression as visible from the UDTF's
         'eval' method. This is required if the expression is not a simple column reference.
+    forwardToOutputTable : bool, default False
+        If true, the UDTF is specifying to Catalyst a metadata property wherein the function call
+        will copy the result of evaluating this column or expression from the most recent input row
+        through to the output table, to a column with the same name specified in the 'alias' field
+        (or the name of the simple column reference otherwise). This is useful because it lets the
+        optimizer push filters or other operations down through the UDTF call to the input table.
     """
 
     name: str
     alias: str = ""
+    forwardToOutputTable: bool = False

Review Comment:
   @dtenedor would it be possible to create a second abstraction rather than an optional field in the select column? For the AI_FORECAST group columns, I'd like to be able to select only the value columns, but pass-through the group columns e.g.
   
   ```
   AnalyzeResult(
       ...,
       select=["v1", "v2"],
       forward_to_output=["dim1", "dim2"]
   )
   ```
   
   this helps materially for AI_FORECAST memory accounting because the value columns are a known statically-sized types (a double-castable number) but the dimension columns can be variably sized (e.g. strings)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #45142:
URL: https://github.com/apache/spark/pull/45142#discussion_r1506861839


##########
python/pyspark/sql/udtf.py:
##########
@@ -123,10 +123,17 @@ class SelectedColumn:
     alias : str, default ''
         If non-empty, this is the alias for the column or expression as visible from the UDTF's
         'eval' method. This is required if the expression is not a simple column reference.
+    forwardToOutputTable : bool, default False
+        If true, the UDTF is specifying to Catalyst a metadata property wherein the function call
+        will copy the result of evaluating this column or expression from the most recent input row
+        through to the output table, to a column with the same name specified in the 'alias' field
+        (or the name of the simple column reference otherwise). This is useful because it lets the
+        optimizer push filters or other operations down through the UDTF call to the input table.
     """
 
     name: str
     alias: str = ""
+    forwardToOutputTable: bool = False

Review Comment:
   @dtenedor would it be possible to create a second abstraction rather than an optional field in the select column? For the AI_FORECAST group columns, I'd like to be able to select only the value columns, but pass-through the group columns e.g.
   
   ```
   AnalyzeResult(
       ...,
       select=["v1", "v2"],
       forward_to_output=["dim1", "dim2"]
   )
   ```
   
   this helps materially for AI_FORECAST memory accounting because the value columns are a known statically-sized type (a double-castable number) but the dimension columns can be variably sized (e.g. strings)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "nickstanishadb (via GitHub)" <gi...@apache.org>.
nickstanishadb commented on code in PR #45142:
URL: https://github.com/apache/spark/pull/45142#discussion_r1506861839


##########
python/pyspark/sql/udtf.py:
##########
@@ -123,10 +123,17 @@ class SelectedColumn:
     alias : str, default ''
         If non-empty, this is the alias for the column or expression as visible from the UDTF's
         'eval' method. This is required if the expression is not a simple column reference.
+    forwardToOutputTable : bool, default False
+        If true, the UDTF is specifying to Catalyst a metadata property wherein the function call
+        will copy the result of evaluating this column or expression from the most recent input row
+        through to the output table, to a column with the same name specified in the 'alias' field
+        (or the name of the simple column reference otherwise). This is useful because it lets the
+        optimizer push filters or other operations down through the UDTF call to the input table.
     """
 
     name: str
     alias: str = ""
+    forwardToOutputTable: bool = False

Review Comment:
   @dtenedor would it be possible to create a second abstraction rather than an optional field in the select column? For the AI_FORECAST group columns, I'd like to be able to select only the value columns, but pass-through the group columns e.g.
   
   ```
   AnalyzeResult(
       ...,
       select=["v1", "v2"],
       forward_to_output=["dim1", "dim2"]
   )
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47032][Python] Prototype for adding pass-through columns to Python UDTF API [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #45142:
URL: https://github.com/apache/spark/pull/45142#issuecomment-1955609505

   Note this is a work-in-progress, I'm a bit busy over the next day or so but will add testing and push a commit then :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on code in PR #45142:
URL: https://github.com/apache/spark/pull/45142#discussion_r1507971562


##########
python/pyspark/sql/udtf.py:
##########
@@ -123,10 +123,17 @@ class SelectedColumn:
     alias : str, default ''
         If non-empty, this is the alias for the column or expression as visible from the UDTF's
         'eval' method. This is required if the expression is not a simple column reference.
+    forwardToOutputTable : bool, default False
+        If true, the UDTF is specifying to Catalyst a metadata property wherein the function call
+        will copy the result of evaluating this column or expression from the most recent input row
+        through to the output table, to a column with the same name specified in the 'alias' field
+        (or the name of the simple column reference otherwise). This is useful because it lets the
+        optimizer push filters or other operations down through the UDTF call to the input table.
     """
 
     name: str
     alias: str = ""
+    forwardToOutputTable: bool = False

Review Comment:
   sure, that sounds good. Let me try to hack that up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47032][Python] Prototype for adding pass-through columns to Python UDTF API [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45142:
URL: https://github.com/apache/spark/pull/45142#discussion_r1493041669


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/PythonUDF.scala:
##########
@@ -195,13 +198,13 @@ case class PythonUDTF(
 
   override protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): PythonUDTF =
     copy(children = newChildren)
-}

Review Comment:
   This removal looks like a mistake. Could you make CIs happy?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor commented on PR #45142:
URL: https://github.com/apache/spark/pull/45142#issuecomment-1965542695

   cc @dongjoon-hyun @ueshin I added some more testing, this is ready for review now.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table [spark]

Posted by "dtenedor (via GitHub)" <gi...@apache.org>.
dtenedor closed pull request #45142: [SPARK-47032][Python] Add UDTF API for "analyze" method to identify pass-through columns to output table
URL: https://github.com/apache/spark/pull/45142


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org