You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abdeali Kothari (JIRA)" <ji...@apache.org> on 2018/06/16 07:20:00 UTC

[jira] [Commented] (SPARK-24458) Invalid PythonUDF check_1(), requires attributes from more than one child

    [ https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514709#comment-16514709 ] 

Abdeali Kothari commented on SPARK-24458:
-----------------------------------------

Got into this a bit more and found that the error is coming from:

[sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala]

Here is the states of the internal variables:

{{ udfs.size = 2}}
 {{ udfs = Stream(check_1_func(), check_1_func())}}
 {{ inputsForPlan.size = 94}}
 {{ inputsForPlan = <A lot of the columns I have in the DF>}}
 {{ plan.children.size = 0}}
 {{ plan.children = List()}}
 {{ prunedChildren.size = 0}}
 {{ prunedChildren = List()}}

This prunedChildren being empty, I assume, causes the attributeMap to not be populated correctly attributeMap (it will be empty).

Hence causing:

{\{ udfs.filterNot(attributeMap.contains).foreach { udf =>}}
 {{    sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")}}
 \{{ }}}

 

 

> Invalid PythonUDF check_1(), requires attributes from more than one child
> -------------------------------------------------------------------------
>
>                 Key: SPARK-24458
>                 URL: https://issues.apache.org/jira/browse/SPARK-24458
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.0
>         Environment: Spark 2.3.0 (local mode)
> Mac OSX
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>  
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires attributes from more than one child.
>  at scala.sys.package$.error(package.scala:27)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
>  at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
>  at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
>  at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>  at scala.collection.immutable.List.foldLeft(List.scala:84)
>  at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
>  at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
>  at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
>  at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
>  at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>  at py4j.Gateway.invoke(Gateway.java:282)
>  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>  at py4j.commands.CallCommand.execute(CallCommand.java:79)
>  at py4j.GatewayConnection.run(GatewayConnection.java:214)
>  at java.lang.Thread.run(Thread.java:748){code}
> I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and I drop some columns in between. Finally, I create 3 python lists (for example, [0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using createDataFrame.
> I join all three list-converted-dataframes using crossJoin() and then do a crossJoin with the original data I have. Then I run a Python UDF which is check_1. check_1 is something like:
> {code:java}
> def check_1():
>     if 1 == 1:
>         return 'yes'
>     else:
>         return 'no'{code}
>  So, it is a Python UDF which takes in no argument and always returns 'yes'. (Note: This UDF is created on the fly... so for testing, I am currently just using this dummy always 'yes' function) 
> After I get check_1 's output, I am converting all my checks (they could be more than 1 but in my current test I have only 1) into a Map(string, string).
> Finally, I try to do a filter("checks['first'] = 'yes'") to filter the records I need.
> When I try to do the filter and then do a .explain() it fails with the above error.
>  
> Here is the explain of the dataframe up until before I do the filter():
>  
> {noformat}
> *(1) Project [... cols ...]
> +- BatchEvalPython [python_udf_to_create_map([check_1], array(pythonUDF0#1851))], [... cols ...]
> +- BatchEvalPython [check_1()], [... cols ...]
> +- InMemoryTableScan [... cols ...]
> +- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 replicas)
> +- BroadcastNestedLoopJoin BuildLeft, Cross
> :- BroadcastExchange IdentityBroadcastMode
> : +- *(5) Project [... cols ...]
> : +- BatchEvalPython [... Python UDF ...], [... cols ...]
> : +- *(4) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(3) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(2) Project [... cols ...]
> : +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ]
> : +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
> +- CartesianProduct
> :- *(6) Project [value#1261 AS computed_v1#1263]
> : +- Scan ExistingRDD[value#1261]
> +- CartesianProduct
> :- *(7) Project [value#1265 AS computed_v2#1267]
> : +- Scan ExistingRDD[value#1265]
> +- *(8) Project [value#1269 AS computed_v3#1271]
> +- Scan ExistingRDD[value#1269]{noformat}
> I have simplified the explain() output. Let me know if I have deleted some data you may need.
>  
>  
> I tried creating a simpler reproducible example, but wasn't able to make anything simpler ....



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org