You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Nicholas Chammas (JIRA)" <ji...@apache.org> on 2016/11/25 21:28:58 UTC

[jira] [Commented] (SPARK-18589) persist() resolves "java.lang.RuntimeException: Invalid PythonUDF (...), requires attributes from more than one child"

    [ https://issues.apache.org/jira/browse/SPARK-18589?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15696717#comment-15696717 ] 

Nicholas Chammas commented on SPARK-18589:
------------------------------------------

cc [~davies] [~hvanhovell]

> persist() resolves "java.lang.RuntimeException: Invalid PythonUDF <lambda>(...), requires attributes from more than one child"
> ------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18589
>                 URL: https://issues.apache.org/jira/browse/SPARK-18589
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>    Affects Versions: 2.0.2, 2.1.0
>         Environment: Python 3.5, Java 8
>            Reporter: Nicholas Chammas
>            Priority: Minor
>
> Smells like another optimizer bug that's similar to SPARK-17100 and SPARK-18254. I'm seeing this on 2.0.2 and on master at commit {{fb07bbe575aabe68422fd3a31865101fb7fa1722}}.
> I don't have a minimal repro for this yet, but the error I'm seeing is:
> {code}
> py4j.protocol.Py4JJavaError: An error occurred while calling o247.count.
> : java.lang.RuntimeException: Invalid PythonUDF <...>(...), requires attributes from more than one child.
>     at scala.sys.package$.error(package.scala:27)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:150)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract$2.apply(ExtractPythonUDFs.scala:149)
>     at scala.collection.immutable.Stream.foreach(Stream.scala:594)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.org$apache$spark$sql$execution$python$ExtractPythonUDFs$$extract(ExtractPythonUDFs.scala:149)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:114)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$$anonfun$apply$2.applyOrElse(ExtractPythonUDFs.scala:113)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$2.apply(TreeNode.scala:312)
>     at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:311)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
>     at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
>     at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:113)
>     at org.apache.spark.sql.execution.python.ExtractPythonUDFs$.apply(ExtractPythonUDFs.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution$$anonfun$prepareForExecution$1.apply(QueryExecution.scala:93)
>     at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
>     at scala.collection.immutable.List.foldLeft(List.scala:84)
>     at org.apache.spark.sql.execution.QueryExecution.prepareForExecution(QueryExecution.scala:93)
>     at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
>     at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
>     at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2555)
>     at org.apache.spark.sql.Dataset.count(Dataset.scala:2226)
>     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:498)
>     at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
>     at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>     at py4j.Gateway.invoke(Gateway.java:280)
>     at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>     at py4j.commands.CallCommand.execute(CallCommand.java:79)
>     at py4j.GatewayConnection.run(GatewayConnection.java:214)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The extended plan (cleaned of field names) is as follows:
> {code}
> == Parsed Logical Plan ==
> 'Filter NOT ('expected_prediction = 'prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, primary_key, struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, primary_key, struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Analyzed Logical Plan ==
> p1: struct<...>, p2: struct<...>, pair_features: vector, rawPrediction: vector, probability: vector, prediction: double, expected_prediction: float
> Filter NOT (cast(expected_prediction as double) = prediction)
> +- Project [p1, p2, pair_features, rawPrediction, probability, prediction, cast((p1._testing_universal_key = p2._testing_universal_key) as float) AS expected_prediction]
>    +- Project [p1, p2, pair_features, rawPrediction, probability, UDF(rawPrediction) AS prediction]
>       +- Project [p1, p2, pair_features, rawPrediction, UDF(rawPrediction) AS probability]
>          +- Project [p1, p2, pair_features, UDF(pair_features) AS rawPrediction]
>             +- Project [p1, p2, <lambda>(p1.person, p2.person) AS pair_features]
>                +- Project [struct(...) AS p1, struct(...) AS p2]
>                   +- Project [_blocking_key, ..., ...]
>                      +- Join Inner, (_blocking_key = _blocking_key)
>                         :- SubqueryAlias p1
>                         :  +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>                         :     +- Project [...]
>                         :        +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
>                         :           +- Project [...]
>                         :              +- Project [_testing_universal_key, primary_key, struct(...) AS person]
>                         :                 +- LogicalRDD [...]
>                         +- SubqueryAlias p2
>                            +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>                               +- Project [...]
>                                  +- Project [primary_key, universal_key, _testing_universal_key, struct(...) AS person]
>                                     +- Project [...]
>                                        +- Project [_testing_universal_key, primary_key, struct(...) AS person]
>                                           +- LogicalRDD [...]
> == Optimized Logical Plan ==
> Project [struct(...) AS p1, struct(...) AS p2, <lambda>(struct(...).person, struct(...).person) AS pair_features, UDF(<lambda>(struct(...).person, struct(...).person)) AS rawPrediction, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS probability, UDF(UDF(<lambda>(struct(...).person, struct(...).person))) AS prediction, cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) AS expected_prediction]
> +- Join Inner, (NOT (cast(cast((struct(...)._testing_universal_key = struct(...)._testing_universal_key) as float) as double) = UDF(UDF(<lambda>(struct(...).person, struct(...).person)))) && (_blocking_key = _blocking_key))
>    :- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>    :  +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>    :     +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
>    :        :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
>    :        :     +- Scan ExistingRDD[...]
>    +- Project [..., <lambda>(dataset_name, primary_key, person) AS _blocking_key]
>       +- Filter isnotnull(<lambda>(dataset_name, primary_key, person))
>          +- InMemoryRelation [...], true, 10000, StorageLevel(memory, 1 replicas)
>             :  +- *Project [primary_key, struct(...) AS person, test_people AS dataset_name]
>             :     +- Scan ExistingRDD[...]
> == Physical Plan ==
> java.lang.RuntimeException: Invalid PythonUDF <lambda>(struct(...).person, struct(...).person), requires attributes from more than one child.
> {code}
> Note the error at the end when Spark tries to print the physical plan. I've scrubbed some Project fields from the plan to simplify the display, but if I've scrubbed anything you think is important let me know.
> I can get around this problem by adding a {{persist()}} right before the operation that fails. The failing operation is a filter.
> Any clues on how I can boil this down to a minimal repro? Any clues about where the problem is?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org