You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:12:40 UTC

[jira] [Resolved] (SPARK-22205) Incorrect result with user defined agg function followed by a non deterministic function

     [ https://issues.apache.org/jira/browse/SPARK-22205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon resolved SPARK-22205.
----------------------------------
    Resolution: Incomplete

> Incorrect result with  user defined agg function followed by a non deterministic function 
> ------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22205
>                 URL: https://issues.apache.org/jira/browse/SPARK-22205
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: kanika dhuria
>            Priority: Major
>              Labels: bulk-closed
>
> Repro 
> Create a user defined function like 
> lass AnyUdaf(dtype:DataType) extends UserDefinedAggregateFunction {
>   def inputSchema:StructType = StructType(StructField("v", dtype) :: Nil)
>   def bufferSchema:StructType = StructType(StructField("v", dtype) :: Nil)
>   def dataType: DataType = dtype
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = { buffer(0) = null }
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>     if (buffer(0) == null) buffer(0) = input(0)
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     if(buffer1(0) == null) buffer1(0) = buffer2(0)
>   }
>   def evaluate(buffer: Row): Any = { buffer(0) }
> }
> Use this in an agg and follow it with non deterministic function like  monotonically_increasing_id.
> Seq(0,1).toDF("c1").select(col("c1"), lit(10)).toDF("c1", "c2").select(col("c1"), col("c2")).toDF("c1", "c2").groupBy(col("c1")).agg(new AnyUdaf()(col("c2"))).toDF("c1", "c2").select(lit(5), col("c2"), monotonically_increasing_id).show
> +---+---+-----------------------------+
> |  5| c2|monotonically_increasing_id()|
> +---+---+-----------------------------+
> |  5|10|                            0|
> |  5|10|                            0|
> +---+---+-----------------------------+



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org