You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Pavel Parkhomenko (Jira)" <ji...@apache.org> on 2019/09/06 00:54:00 UTC
[jira] [Commented] (SPARK-28246) State of UDAF: buffer is not
cleared
[ https://issues.apache.org/jira/browse/SPARK-28246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16923850#comment-16923850 ]
Pavel Parkhomenko commented on SPARK-28246:
-------------------------------------------
It is about initialization, not merge.
> State of UDAF: buffer is not cleared
> ------------------------------------
>
> Key: SPARK-28246
> URL: https://issues.apache.org/jira/browse/SPARK-28246
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.4.3
> Environment: Ubuntu Linux 16.04
> Reproducible with option --master local[1]
> {code:java}
> $ spark-shell --master local[1]
> {code}
> Reporter: Pavel Parkhomenko
> Priority: Major
>
> Buffer object for UserDefinedAggregateFunction contains data from previous iteration. For example,
> {code:java}
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
> import org.apache.spark.sql.types._
> import org.apache.spark.sql.functions.callUDF
> import java.util.Arrays.asList
> val df = spark.createDataFrame(
> asList(
> Row(1, "a"),
> Row(2, "b")),
> StructType(List(
> StructField("id", IntegerType),
> StructField("value", StringType))))
> trait Min extends UserDefinedAggregateFunction {
> override val inputSchema: StructType = StructType(Array(StructField("value", StringType)))
> override val bufferSchema: StructType = StructType(Array(StructField("min", StringType)))
> override def dataType: DataType = StringType
> override def deterministic: Boolean = true
> override def update(buffer: MutableAggregationBuffer, input: Row): Unit =
> if (input(0) != null && (buffer(0) == null || buffer.getString(0) > input.getString(0))) buffer(0) = input(0)
> override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = update(buffer1, buffer2)
> override def evaluate(buffer: Row): Any = buffer(0)
> }
> class GoodMin extends Min {
> override def initialize(buffer: MutableAggregationBuffer): Unit = buffer(0) = None
> }
> class BadMin extends Min {
> override def initialize(buffer: MutableAggregationBuffer): Unit = {}
> }
> spark.udf.register("goodmin", new GoodMin)
> spark.udf.register("badmin", new BadMin)
> df groupBy "id" agg callUDF("goodmin", $"value") show false
> df groupBy "id" agg callUDF("badmin", $"value") show false
> {code}
> Output is
> {noformat}
> scala> df groupBy "id" agg callUDF("goodmin", $"value") show false
> +---+--------------+
> |id |goodmin(value)|
> +---+--------------+
> |1 |a |
> |2 |b |
> +---+--------------+
> scala> df groupBy "id" agg callUDF("badmin", $"value") show false
> +---+-------------+
> |id |badmin(value)|
> +---+-------------+
> |1 |a |
> |2 |a |
> +---+-------------+
> {noformat}
> The difference between GoodMin and BadMin is a buffer initialization.
> *This example could be reproduced with a single worker thread only*. To reproduce it is mandatory to run spark shell with option
> {code:java}
> spark-shell --master local[1]
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.2#803003)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org