You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Davies Liu (JIRA)" <ji...@apache.org> on 2015/12/11 21:42:46 UTC

[jira] [Commented] (SPARK-11885) UDAF may nondeterministically generate wrong results

    [ https://issues.apache.org/jira/browse/SPARK-11885?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15053528#comment-15053528 ] 

Davies Liu commented on SPARK-11885:
------------------------------------

The root cause is that we generate ExprId for ScalaUDAF in Executor, then grouping key and aggregation buffer could have same Id (always bound to grouping key, because it come first), cause wrong result.

Back port https://github.com/apache/spark/pull/9093 into 1.5 branch, so it could be fixed.

> UDAF may nondeterministically generate wrong results
> ----------------------------------------------------
>
>                 Key: SPARK-11885
>                 URL: https://issues.apache.org/jira/browse/SPARK-11885
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 1.5.2
>            Reporter: Yin Huai
>            Assignee: Davies Liu
>            Priority: Critical
>
> I could not reproduce it in 1.6 branch (it can be easily reproduced in 1.5). I think it is an issue in 1.5 branch.
> Try the following in spark 1.5 (with a cluster) and you can see the problem.
> {code}
> import java.math.BigDecimal
> import org.apache.spark.sql.expressions.MutableAggregationBuffer
> import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
> import org.apache.spark.sql.Row
> import org.apache.spark.sql.types.{StructType, StructField, DataType, DoubleType, LongType}
> class GeometricMean extends UserDefinedAggregateFunction {
>   def inputSchema: StructType =
>     StructType(StructField("value", DoubleType) :: Nil)
>   def bufferSchema: StructType = StructType(
>     StructField("count", LongType) ::
>       StructField("product", DoubleType) :: Nil
>   )
>   def dataType: DataType = DoubleType
>   def deterministic: Boolean = true
>   def initialize(buffer: MutableAggregationBuffer): Unit = {
>     buffer(0) = 0L
>     buffer(1) = 1.0
>   }
>   def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
>     buffer(0) = buffer.getAs[Long](0) + 1
>     buffer(1) = buffer.getAs[Double](1) * input.getAs[Double](0)
>   }
>   def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
>     buffer1(0) = buffer1.getAs[Long](0) + buffer2.getAs[Long](0)
>     buffer1(1) = buffer1.getAs[Double](1) * buffer2.getAs[Double](1)
>   }
>   def evaluate(buffer: Row): Any = {
>     math.pow(buffer.getDouble(1), 1.0d / buffer.getLong(0))
>   }
> }
> sqlContext.udf.register("gm", new GeometricMean)
> val df = Seq(
>   (1, "italy", "emilia", 42, BigDecimal.valueOf(100, 0), "john"),
>   (2, "italy", "toscana", 42, BigDecimal.valueOf(505, 1), "jim"),
>   (3, "italy", "puglia", 42, BigDecimal.valueOf(70, 0), "jenn"),
>   (4, "italy", "emilia", 42, BigDecimal.valueOf(75 ,0), "jack"),
>   (5, "uk", "london", 42, BigDecimal.valueOf(200 ,0), "carl"),
>   (6, "italy", "emilia", 42, BigDecimal.valueOf(42, 0), "john")).
>   toDF("receipt_id", "store_country", "store_region", "store_id", "amount", "seller_name")
> df.registerTempTable("receipts")
>   
> val q = sql("""
> select   store_country,
>          store_region,
>          avg(amount),
>          sum(amount),
>          gm(amount)
> from     receipts
> where    amount > 50
>          and store_country = 'italy'
> group by store_country, store_region
> """)
> q.show
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org