You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Jungtaek Lim (Jira)" <ji...@apache.org> on 2019/09/06 02:25:00 UTC

[jira] [Comment Edited] (SPARK-28246) State of UDAF: buffer is not cleared

    [ https://issues.apache.org/jira/browse/SPARK-28246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923875#comment-16923875 ] 

Jungtaek Lim edited comment on SPARK-28246 at 9/6/19 2:24 AM:
--------------------------------------------------------------

The implicit rule is when initialize or equivalent method is presented, it will be called and it should initialize everything to be a default value. JVM developers tend to expect default value is set for uninitialized thing, but that's the guarantee JVM is providing. Spark doesn't set default value to and let end users initialize it by themselves - it's natural because Spark doesn't know about "default value" of accumulator, unlike JVM's case.

I guess the explanation of flow for UDAG (the sequence of method calls in Spark) is missing in both the javadoc of UDAF class and sql-getting-started.md. So I feel that could be a documentation issue, but still not a bug.


was (Author: kabhwan):
The implicit rule is when initialize or equivalent method is presented, it will be called and it should initialize everything to be a default value. JVM developers tend to expect default value is set for uninitialized thing, but that's the guarantee JVM is providing. Spark doesn't set default value to and let end users initialize it by themselves - it's natural because Spark doesn't know about "default value", unlike JVM's case.

I guess the explanation of flow for UDAG (the sequence of method calls in Spark) is missing in both the javadoc of UDAF class and sql-getting-started.md. So I feel that could be a documentation issue, but still not a bug.

> State of UDAF: buffer is not cleared
> ------------------------------------
>
>                 Key: SPARK-28246
>                 URL: https://issues.apache.org/jira/browse/SPARK-28246
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.3
>         Environment: Ubuntu Linux 16.04
> Reproducible with option --master local[1]
> {code:java}
> $ spark-shell --master local[1]
> {code}
>            Reporter: Pavel Parkhomenko
>            Priority: Major
>
> Buffer object for UserDefinedAggregateFunction contains data from previous iteration. For example,
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions.callUDF
> import java.util.Arrays.asList
> val df = spark.createDataFrame(
>   asList(
>     Row(1, "a"),
>     Row(2, "b")),
>   StructType(List(
>     StructField("id", IntegerType),
>     StructField("value", StringType))))
> trait Min extends UserDefinedAggregateFunction {
>   override val inputSchema: StructType = StructType(Array(StructField("value", StringType)))
>   override val bufferSchema: StructType = StructType(Array(StructField("min", StringType)))
>   override def dataType: DataType = StringType
>   override def deterministic: Boolean = true
>   override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
>     if (input(0) != null && (buffer(0) == null || buffer.getString(0) > input.getString(0))) buffer(0) = input(0)
>   override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
>   override def evaluate(buffer: Row): Any = buffer(0)
> }
> class GoodMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = None
> }
> class BadMin extends Min {
>   override def initialize(buffer: MutableAggregationBuffer): Unit = {}
> }
> spark.udf.register("goodmin", new GoodMin)
> spark.udf.register("badmin", new BadMin)
> df groupBy "id" agg callUDF("goodmin", $"value") show false
> df groupBy "id" agg callUDF("badmin", $"value") show false
> {code}
> Output is
> {noformat}
> scala> df groupBy "id" agg callUDF("goodmin", $"value") show false
> +---+--------------+
> |id |goodmin(value)|
> +---+--------------+
> |1  |a             |
> |2  |b             |
> +---+--------------+
> scala> df groupBy "id" agg callUDF("badmin", $"value") show false
> +---+-------------+
> |id |badmin(value)|
> +---+-------------+
> |1  |a            |
> |2  |a            |
> +---+-------------+
> {noformat}
> The difference between GoodMin and BadMin is a buffer initialization.
> *This example could be reproduced with a single worker thread only*. To reproduce it is mandatory to run spark shell with option
> {code:java}
> spark-shell --master local[1]
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org