You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Abdeali Kothari (JIRA)" <ji...@apache.org> on 2018/06/16 07:20:00 UTC
[jira] [Commented] (SPARK-24458) Invalid PythonUDF check_1(),
requires attributes from more than one child
[ https://issues.apache.org/jira/browse/SPARK-24458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16514709#comment-16514709 ]
Abdeali Kothari commented on SPARK-24458:
-----------------------------------------
Got into this a bit more and found that the error is coming from:
[sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala]
Here is the states of the internal variables:
{{ udfs.size = 2}}
{{ udfs = Stream(check_1_func(), check_1_func())}}
{{ inputsForPlan.size = 94}}
{{ inputsForPlan = <A lot of the columns I have in the DF>}}
{{ plan.children.size = 0}}
{{ plan.children = List()}}
{{ prunedChildren.size = 0}}
{{ prunedChildren = List()}}
This prunedChildren being empty, I assume, causes the attributeMap to not be populated correctly attributeMap (it will be empty).
Hence causing:
{\{ udfs.filterNot(attributeMap.contains).foreach { udf =>}}
{{ sys.error(s"Invalid PythonUDF $udf, requires attributes from more than one child.")}}
\{{ }}}
> Invalid PythonUDF check_1(), requires attributes from more than one child
> -------------------------------------------------------------------------
>
> Key: SPARK-24458
> URL: https://issues.apache.org/jira/browse/SPARK-24458
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.0
> Environment: Spark 2.3.0 (local mode)
> Mac OSX
> Reporter: Abdeali Kothari
> Priority: Major
>
> I was trying out a very large query execution plan I have and I got the error:
>
> {code:java}
> py4j.protocol.Py4JJavaError: An error occurred while calling o359.simpleString.
> : java.lang.RuntimeException: Invalid PythonUDF check_1(), requires attributes from more than one child.
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:182)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:181)
> at scala.collection.immutable.Stream.foreach(Stream.scala:594)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:181)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:118)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:114)
> at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:94)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:87)
> at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
> at scala.collection.immutable.List.foldLeft(List.scala:84)
> at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:87)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:77)
> at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:77)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
> at org.apache.spark.sql.execution.QueryExecution$$anonfun$simpleString$1.apply(QueryExecution.scala:187)
> at org.apache.spark.sql.execution.QueryExecution.stringOrError(QueryExecution.scala:100)
> at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:187)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:282)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:748){code}
> I get a dataframe (df) after a lot of PythonUDFs running on a CSV dataset and I drop some columns in between. Finally, I create 3 python lists (for example, [0.1, 0.2, 0.3, ...] which I convert to a Spark DataFrame using createDataFrame.
> I join all three list-converted-dataframes using crossJoin() and then do a crossJoin with the original data I have. Then I run a Python UDF which is check_1. check_1 is something like:
> {code:java}
> def check_1():
> if 1 == 1:
> return 'yes'
> else:
> return 'no'{code}
> So, it is a Python UDF which takes in no argument and always returns 'yes'. (Note: This UDF is created on the fly... so for testing, I am currently just using this dummy always 'yes' function)
> After I get check_1 's output, I am converting all my checks (they could be more than 1 but in my current test I have only 1) into a Map(string, string).
> Finally, I try to do a filter("checks['first'] = 'yes'") to filter the records I need.
> When I try to do the filter and then do a .explain() it fails with the above error.
>
> Here is the explain of the dataframe up until before I do the filter():
>
> {noformat}
> *(1) Project [... cols ...]
> +- BatchEvalPython [python_udf_to_create_map([check_1], array(pythonUDF0#1851))], [... cols ...]
> +- BatchEvalPython [check_1()], [... cols ...]
> +- InMemoryTableScan [... cols ...]
> +- InMemoryRelation [... cols ...], true, 10000, StorageLevel(disk, 1 replicas)
> +- BroadcastNestedLoopJoin BuildLeft, Cross
> :- BroadcastExchange IdentityBroadcastMode
> : +- *(5) Project [... cols ...]
> : +- BatchEvalPython [... Python UDF ...], [... cols ...]
> : +- *(4) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(3) Project [... cols ...]
> : +- BatchEvalPython [... Python UDFs ...], [... cols ...]
> : +- *(2) Project [... cols ...]
> : +- BatchEvalPython [ ... Python UDFs ... ], [ ... cols ... ]
> : +- *(1) FileScan csv [ ... cols ... ] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/Users/abdealijk/Documents/data..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<...
> +- CartesianProduct
> :- *(6) Project [value#1261 AS computed_v1#1263]
> : +- Scan ExistingRDD[value#1261]
> +- CartesianProduct
> :- *(7) Project [value#1265 AS computed_v2#1267]
> : +- Scan ExistingRDD[value#1265]
> +- *(8) Project [value#1269 AS computed_v3#1271]
> +- Scan ExistingRDD[value#1269]{noformat}
> I have simplified the explain() output. Let me know if I have deleted some data you may need.
>
>
> I tried creating a simpler reproducible example, but wasn't able to make anything simpler ....
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org