You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/01/08 02:58:00 UTC

[jira] [Resolved] (SPARK-26562) countDistinct and user-defined function cannot be used in SELECT

     [ https://issues.apache.org/jira/browse/SPARK-26562?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-26562.
----------------------------------
    Resolution: Cannot Reproduce

I can't reproduce this:

{code}
>>> df.select(F.sum(df['a']), F.count(df['a']), F.countDistinct(df['a']), F.approx_count_distinct(df['a']), F.sum(func(df['a'])) ).show()
+------+--------+-----------------+------------------------+----------------+
|sum(a)|count(a)|count(DISTINCT a)|approx_count_distinct(a)|sum(<lambda>(a))|
+------+--------+-----------------+------------------------+----------------+
|     7|       3|                3|                       3|             3.0|
+------+--------+-----------------+------------------------+----------------+
{code}

in the current master. It would be great if we can identify the JIRA and backport if applicable. I'm leaving this resolved.

> countDistinct and user-defined function cannot be used in SELECT
> ----------------------------------------------------------------
>
>                 Key: SPARK-26562
>                 URL: https://issues.apache.org/jira/browse/SPARK-26562
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.1
>         Environment: Macbook Pro 10.14.2
> spark 2.3.1
>  
>            Reporter: Ravi Kaushik
>            Priority: Minor
>
> df=spark.createDataFrame([ [1,2,3], [2,3,4], [4,5,6] ], ['a', 'b', 'c'])
> from pyspark.sql import functions as F, types as T
> df.select(F.sum(df['a']), F.count(df['a']), F.countDistinct(df['a']), F.approx_count_distinct(df['a'])).show()
> func =F.udf(lambda x: 1.0, T.DoubelType())
>  
> df.select(F.sum(df['a']), F.count(df['a']), F.countDistinct(df['a']), F.approx_count_distinct(df['a']), F.sum(func(df['a'])) ).show()
>  
>  
> Error
>  
> 2019-01-07 18:30:50 ERROR Executor:91 - Exception in task 6.0 in stage 4.0 (TID 223)
> org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Binding attribute, tree: sum#45849
>  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:56)
>  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:91)
>  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:90)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
>  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4$$anonfun$apply$11.apply(TreeNode.scala:335)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.immutable.List.foreach(List.scala:381)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.immutable.List.map(List.scala:285)
>  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:333)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272)
>  at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
>  at org.apache.spark.sql.catalyst.expressions.BindReferences$.bindReference(BoundAttribute.scala:90)
>  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind$1.apply(GenerateMutableProjection.scala:38)
>  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$$anonfun$bind$1.apply(GenerateMutableProjection.scala:38)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.bind(GenerateMutableProjection.scala:38)
>  at org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection$.generate(GenerateMutableProjection.scala:44)
>  at org.apache.spark.sql.execution.SparkPlan.newMutableProjection(SparkPlan.scala:383)
>  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4$$anonfun$5.apply(HashAggregateExec.scala:119)
>  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4$$anonfun$5.apply(HashAggregateExec.scala:118)
>  at org.apache.spark.sql.execution.aggregate.AggregationIterator.generateProcessRow(AggregationIterator.scala:180)
>  at org.apache.spark.sql.execution.aggregate.AggregationIterator.<init>(AggregationIterator.scala:199)
>  at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:97)
>  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:111)
>  at org.apache.spark.sql.execution.aggregate.HashAggregateExec$$anonfun$doExecute$1$$anonfun$4.apply(HashAggregateExec.scala:101)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>  at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:847)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>  at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
>  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
>  at org.apache.spark.scheduler.Task.run(Task.scala:109)
>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
>  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>  at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Couldn't find sum#45849 in [sum#43751L,count#43753L,MS[0]#45328L,MS[1]#45329L,MS[2]#45330L,MS[3]#45331L,MS[4]#45332L,MS[5]#45333L,MS[6]#45334L,MS[7]#45335L,MS[8]#45336L,MS[9]#45337L,MS[10]#45338L,MS[11]#45339L,MS[12]#45340L,MS[13]#45341L,MS[14]#45342L,MS[15]#45343L,MS[16]#45344L,MS[17]#45345L,MS[18]#45346L,MS[19]#45347L,MS[20]#45348L,MS[21]#45349L,MS[22]#45350L,MS[23]#45351L,MS[24]#45352L,MS[25]#45353L,MS[26]#45354L,MS[27]#45355L,MS[28]#45356L,MS[29]#45357L,MS[30]#45358L,MS[31]#45359L,MS[32]#45360L,MS[33]#45361L,MS[34]#45362L,MS[35]#45363L,MS[36]#45364L,MS[37]#45365L,MS[38]#45366L,MS[39]#45367L,MS[40]#45368L,MS[41]#45369L,MS[42]#45370L,MS[43]#45371L,MS[44]#45372L,MS[45]#45373L,MS[46]#45374L,MS[47]#45375L,MS[48]#45376L,MS[49]#45377L,MS[50]#45378L,MS[51]#45379L,sum#43766,count#43758L,a#0L,sum#43752L,count#43754L,MS[0]#43463L,MS[1]#43464L,MS[2]#43465L,MS[3]#43466L,MS[4]#43467L,MS[5]#43468L,MS[6]#43469L,MS[7]#43470L,MS[8]#43471L,MS[9]#43472L,MS[10]#43473L,MS[11]#43474L,MS[12]#43475L,MS[13]#43476L,MS[14]#43477L,MS[15]#43478L,MS[16]#43479L,MS[17]#43480L,MS[18]#43481L,MS[19]#43482L,MS[20]#43483L,MS[21]#43484L,MS[22]#43485L,MS[23]#43486L,MS[24]#43487L,MS[25]#43488L,MS[26]#43489L,MS[27]#43490L,MS[28]#43491L,MS[29]#43492L,MS[30]#43493L,MS[31]#43494L,MS[32]#43495L,MS[33]#43496L,MS[34]#43497L,MS[35]#43498L,MS[36]#43499L,MS[37]#43500L,MS[38]#43501L,MS[39]#43502L,MS[40]#43503L,MS[41]#43504L,MS[42]#43505L,MS[43]#43506L,MS[44]#43507L,MS[45]#43508L,MS[46]#43509L,MS[47]#43510L,MS[48]#43511L,MS[49]#43512L,MS[50]#43513L,MS[51]#43514L,sum#43756,pythonUDF0#43765]
>  at scala.sys.package$.error(package.scala:27)
>  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:97)
>  at org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1$$anonfun$applyOrElse$1.apply(BoundAttribute.scala:91)
>  at org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
>  ... 59 more
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org