You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pavel Parkhomenko (Jira)" <ji...@apache.org> on 2019/09/06 00:54:00 UTC

[jira] [Commented] (SPARK-28246) State of UDAF: buffer is not cleared

    [ https://issues.apache.org/jira/browse/SPARK-28246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923850#comment-16923850 ] 

Pavel Parkhomenko commented on SPARK-28246:
-------------------------------------------

It is about initialization, not merge.

> State of UDAF: buffer is not cleared
> ------------------------------------
>
>                 Key: SPARK-28246
>                 URL: https://issues.apache.org/jira/browse/SPARK-28246
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>         Environment: Ubuntu Linux 16.04
> Reproducible with option --master local[1]
> {code:java}
> $ spark-shell --master local[1]
> {code}
>            Reporter: Pavel Parkhomenko
>            Priority: Major
>
> Buffer object for UserDefinedAggregateFunction contains data from previous iteration. For example,
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions.callUDF
> import java.util.Arrays.asList
> val df = spark.createDataFrame(
>   asList(
>     Row(1, "a"),
>     Row(2, "b")),
>   StructType(List(
>     StructField("id", IntegerType),
>     StructField("value", StringType))))
> trait Min extends UserDefinedAggregateFunction {
>   override val inputSchema: StructType = StructType(Array(StructField("value", StringType)))
>   override val bufferSchema: StructType = StructType(Array(StructField("min", StringType)))
>   override def dataType: DataType = StringType
>   override def deterministic: Boolean = true
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
>     if (input(0) != null && (buffer(0) == null || buffer.getString(0) > input.getString(0))) buffer(0) = input(0)
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
>   override def evaluate(buffer: Row): Any = buffer(0)
> }
> class GoodMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = None
> }
> class BadMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {}
> }
> spark.udf.register("goodmin", new GoodMin)
> spark.udf.register("badmin", new BadMin)
> df groupBy "id" agg callUDF("goodmin", $"value") show false
> df groupBy "id" agg callUDF("badmin", $"value") show false
> {code}
> Output is
> {noformat}
> scala> df groupBy "id" agg callUDF("goodmin", $"value") show false
> +---+--------------+
> |id |goodmin(value)|
> +---+--------------+
> |1  |a             |
> |2  |b             |
> +---+--------------+
> scala> df groupBy "id" agg callUDF("badmin", $"value") show false
> +---+-------------+
> |id |badmin(value)|
> +---+-------------+
> |1  |a            |
> |2  |a            |
> +---+-------------+
> {noformat}
> The difference between GoodMin and BadMin is a buffer initialization.
> *This example could be reproduced with a single worker thread only*. To reproduce it is mandatory to run spark shell with option
> {code:java}
> spark-shell --master local[1]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org