You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2018/06/22 04:23:00 UTC

[jira] [Resolved] (SPARK-24458) Invalid PythonUDF check_1(), requires attributes from more than one child

     [ https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-24458.
----------------------------------
    Resolution: Cannot Reproduce

Let me leave this resolved per ^ for the current status. 
Please reopen this if this still exists in the master.
Otherwise, we really should find the JIRA that fixes this and see if we can backport this.

Should be good to check if this exists in 2.3.1 too.


> Invalid PythonUDF check_1(), requires attributes from more than one child
> -------------------------------------------------------------------------
>
>                 Key: SPARK-24458
>                 URL: https://issues.apache.org/jira/browse/SPARK-24458
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: Spark 2.3.0 (local mode)
> Mac OSX
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
>  at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at py4j.Gateway.invoke(Gateway.java:282)
>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at py4j.GatewayConnection.run(GatewayConnection.java:214)
>  at java.lang.Thread.run(Thread.java:748){code}
> I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and I drop some columns in between. Finally, I create 3 python lists (for example, [0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using createDataFrame.
> I join all three list-converted-dataframes using crossJoin() and then do a crossJoin with the original data I have. Then I run a Python UDF which is check_1. check_1 is something like:
> {code:java}
> def check_1():
>     if 1 == 1:
>         return 'yes'
>     else:
>         return 'no'{code}
>  So, it is a Python UDF which takes in no argument and always returns 'yes'. (Note: This UDF is created on the fly... so for testing, I am currently just using this dummy always 'yes' function) 
> After I get check_1 's output, I am converting all my checks (they could be more than 1 but in my current test I have only 1) into a Map(string, string).
> Finally, I try to do a filter("checks['first'] = 'yes'") to filter the records I need.
> When I try to do the filter and then do a .explain() it fails with the above error.
>  
> Here is the explain of the dataframe up until before I do the filter():
>  
> {noformat}
> *(1) Project [... cols ...]
> +- BatchEvalPython [python_udf_to_create_map([check_1], array(pythonUDF0#1851))], [... cols ...]
> +- BatchEvalPython [check_1()], [... cols ...]
> +- InMemoryTableScan [... cols ...]
> +- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 replicas)
> +- BroadcastNestedLoopJoin BuildLeft, Cross
> :- BroadcastExchange IdentityBroadcastMode
> : +- *(5) Project [... cols ...]
> : +- BatchEvalPython [... Python UDF ...], [... cols ...]
> : +- *(4) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(3) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(2) Project [... cols ...]
> : +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ]
> : +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
> +- CartesianProduct
> :- *(6) Project [value#1261 AS computed_v1#1263]
> : +- Scan ExistingRDD[value#1261]
> +- CartesianProduct
> :- *(7) Project [value#1265 AS computed_v2#1267]
> : +- Scan ExistingRDD[value#1265]
> +- *(8) Project [value#1269 AS computed_v3#1271]
> +- Scan ExistingRDD[value#1269]{noformat}
> I have simplified the explain() output. Let me know if I have deleted some data you may need.
>  
>  
> I tried creating a simpler reproducible example, but wasn't able to make anything simpler ....



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org