You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Andrew Duffy (JIRA)" <ji...@apache.org> on 2017/11/29 03:59:00 UTC
[jira] [Updated] (SPARK-22641) Pyspark UDF relying on column added
with withColumn after distinct
[ https://issues.apache.org/jira/browse/SPARK-22641?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Duffy updated SPARK-22641:
---------------------------------
Description:
We seem to have found an issue with PySpark UDFs interacting with {{withColumn}} when the UDF depends on the column added in {{withColumn}}, but _only_ if {{withColumn}} is performed after a {{distinct()}}.
Simplest repro in a local PySpark shell:
{code}
import pyspark.sql.functions as F
@F.udf
def ident(x):
return x
spark.createDataFrame([{'a': '1'}]) \
.distinct() \
.withColumn('b', F.lit('qq')) \
.withColumn('fails_here', ident('b')) \
.collect()
{code}
This fails with the following exception:
{code}
Run
File
Edit
View
Kernel
Local Scope
24
1
2
3
4
5
6
7
# Initialize
import pyspark.sql as S
import pyspark.sql.functions as F
sc = get_sc()
sqlContext = S.SQLContext(sc)
spark = sqlContext.sparkSession
No results
25
1
2
3
4
@F.udf
def ident(x):
return x
No results
40
1
3
5
4
2
6
spark.createDataFrame([{'a': '1'}]) \
.withColumn('b', F.lit('qq')) \
.collect()
.withColumn('fails_here', ident('b')) \
.distinct() \
No results
Py4JJavaErrorTraceback (most recent call last)
in ()
----> 1 spark.createDataFrame([{'a': '1'}]) .distinct() .withColumn('b', F.lit('qq')) .withColumn('fails_here', ident('b')) .collect()
/opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py in collect(self)
428 """
429 with SCCallSiteSync(self._sc) as css:
--> 430 port = self._jdf.collectToPython()
431 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
432
/opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
317 raise Py4JJavaError(
318 "An error occurred while calling {0}{1}{2}.\n".
--> 319 format(target_id, ".", name), value)
320 else:
321 raise Py4JError(
Py4JJavaError: An error occurred while calling o1321.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#306
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2872)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2869)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2869)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2892)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2869)
at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#306 in [a#293]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 58 more
{code}
The odd part is that if you run the code, but remove the {{.distinct()}}, or place it after {{.withColumn("fails_here", ...)}} we don't get the error.
was:
We seem to have found an issue with PySpark UDFs interacting with {{withColumn}} when the UDF depends on the column added in {{withColumn}}, but _only_ if {{withColumn}} is performed after a {{distinct()}}.
Simplest repro in a local PySpark shell:
{code}
import pyspark.sql.functions as F
@F.udf(returnType="integer")
def ident(x):
return x
df = spark.createDataFrame([
{'a': '1', 'nums': ['1']},
{'a': '2', 'nums': ['1', '2']}
])
df2 = df.distinct().withColumn('c', F.lit(1))
df2.show()
df2.withColumn('added', ident(df2['c'])).collect()
{code}
The {{df.show()}} will succeed, but the following collect fails with the following exception:
{code}
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Users/aduffy/git/open_source/spark/python/pyspark/sql/dataframe.py", line 451, in collect
port = self._jdf.collectToPython()
File "/Users/aduffy/git/open_source/spark/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
File "/Users/aduffy/git/open_source/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/aduffy/git/open_source/spark/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o72.collectToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#26
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:512)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$38.apply(HashAggregateExec.scala:511)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultFunction(HashAggregateExec.scala:511)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:657)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:164)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:361)
at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:113)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:141)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:138)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:233)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:280)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:3088)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3085)
at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3085)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:3118)
at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3085)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#26 in [a#0,nums#1]
at scala.sys.package$.error(package.scala:27)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
... 58 more
{code}
The odd part is that if you run the code, but remove the {{.distinct()}}, or place it after {{.withColumn("b", ...)}} we don't get the error.
> Pyspark UDF relying on column added with withColumn after distinct
> ------------------------------------------------------------------
>
> Key: SPARK-22641
> URL: https://issues.apache.org/jira/browse/SPARK-22641
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.3.0
> Reporter: Andrew Duffy
>
> We seem to have found an issue with PySpark UDFs interacting with {{withColumn}} when the UDF depends on the column added in {{withColumn}}, but _only_ if {{withColumn}} is performed after a {{distinct()}}.
> Simplest repro in a local PySpark shell:
> {code}
> import pyspark.sql.functions as F
> @F.udf
> def ident(x):
> return x
> spark.createDataFrame([{'a': '1'}]) \
> .distinct() \
> .withColumn('b', F.lit('qq')) \
> .withColumn('fails_here', ident('b')) \
> .collect()
> {code}
> This fails with the following exception:
> {code}
> Run
> File
> Edit
> View
> Kernel
> Local Scope
> 24
> 1
> 2
> 3
> 4
> 5
> 6
> 7
> # Initialize
> import pyspark.sql as S
> import pyspark.sql.functions as F
> sc = get_sc()
> sqlContext = S.SQLContext(sc)
> spark = sqlContext.sparkSession
>
> No results
> 25
> 1
> 2
> 3
> 4
> @F.udf
> def ident(x):
> return x
>
> No results
> 40
> 1
> 3
> 5
> 4
> 2
> 6
> spark.createDataFrame([{'a': '1'}]) \
> .withColumn('b', F.lit('qq')) \
> .collect()
> .withColumn('fails_here', ident('b')) \
> .distinct() \
>
> No results
> Py4JJavaErrorTraceback (most recent call last)
> in ()
> ----> 1 spark.createDataFrame([{'a': '1'}]) .distinct() .withColumn('b', F.lit('qq')) .withColumn('fails_here', ident('b')) .collect()
> /opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py in collect(self)
> 428 """
> 429 with SCCallSiteSync(self._sc) as css:
> --> 430 port = self._jdf.collectToPython()
> 431 return list(_load_from_socket(port, BatchedSerializer(PickleSerializer())))
> 432
> /opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/java_gateway.pyc in __call__(self, *args)
> 1131 answer = self.gateway_client.send_command(command)
> 1132 return_value = get_return_value(
> -> 1133 answer, self.gateway_client, self.target_id, self.name)
> 1134
> 1135 for temp_arg in temp_args:
> /opt/palantir/services/.296331252/service/spark/python/lib/pyspark.zip/pyspark/sql/utils.py in deco(*a, **kw)
> 61 def deco(*a, **kw):
> 62 try:
> ---> 63 return f(*a, **kw)
> 64 except py4j.protocol.Py4JJavaError as e:
> 65 s = e.java_exception.toString()
> /opt/palantir/services/.296331252/var/data/envs/python/default/3365517267c0b352b50f13a35d1b2ed1/lib/python2.7/site-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
> 317 raise Py4JJavaError(
> 318 "An error occurred while calling {0}{1}{2}.\n".
> --> 319 format(target_id, ".", name), value)
> 320 else:
> 321 raise Py4JError(
> Py4JJavaError: An error occurred while calling o1321.collectToPython.
> : org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: pythonUDF0#306
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:88)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:87)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
> at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
> at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
> at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:87)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:475)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$33.apply(HashAggregateExec.scala:474)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec.generateResultCode(HashAggregateExec.scala:474)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduceWithKeys(HashAggregateExec.scala:612)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec.doProduce(HashAggregateExec.scala:148)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85)
> at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
> at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80)
> at org.apache.spark.sql.execution.aggregate.HashAggregateExec.produce(HashAggregateExec.scala:38)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331)
> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
> at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
> at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
> at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
> at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
> at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228)
> at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:275)
> at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply$mcI$sp(Dataset.scala:2872)
> at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2869)
> at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:2869)
> at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
> at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2892)
> at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2869)
> at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at py4j.Gateway.invoke(Gateway.java:280)
> at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:214)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Couldn't find pythonUDF0#306 in [a#293]
> at scala.sys.package$.error(package.scala:27)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:94)
> at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:88)
> at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
> ... 58 more
> {code}
> The odd part is that if you run the code, but remove the {{.distinct()}}, or place it after {{.withColumn("fails_here", ...)}} we don't get the error.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org