You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@arrow.apache.org by "Jorge (Jira)" <ji...@apache.org> on 2020/09/12 21:01:00 UTC

[jira] [Assigned] (ARROW-9937) [Rust] [DataFusion] Average is not correct

     [ https://issues.apache.org/jira/browse/ARROW-9937?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jorge reassigned ARROW-9937:
----------------------------

    Assignee: Jorge

> [Rust] [DataFusion] Average is not correct
> ------------------------------------------
>
>                 Key: ARROW-9937
>                 URL: https://issues.apache.org/jira/browse/ARROW-9937
>             Project: Apache Arrow
>          Issue Type: Bug
>          Components: Rust, Rust - DataFusion
>            Reporter: Jorge
>            Assignee: Jorge
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> The current design of aggregates makes the calculation of the average incorrect.
> Namely, if there are multiple input partitions, the result is average of the averages. For example if the input it in two batches {{[1,2]}}, and {{[3,4,5]}}, datafusion will say "average=3.25" rather than "average=3".
>  It also makes it impossible to compute the [geometric mean|https://en.wikipedia.org/wiki/Geometric_mean], distinct sum, and other operations.
> The central issue is that Accumulator returns a `ScalarValue` during partial aggregations via {{get_value}}, but very often a `ScalarValue` is not sufficient information to perform the full aggregation.
> A simple example is the average of 5 numbers, x1, x2, x3, x4, x5, that are distributed in batches of 2,
> {[x1, x2], [x3, x4], [x5]}
> . Our current calculation performs partial means,
> {(x1+x2)/2, (x3+x4)/2, x5}
> , and then reduces them using another average, i.e.
> {{((x1+x2)/2 + (x3+x4)/2 + x5)/3}}
> which is not equal to {{(x1 + x2 + x3 + x4 + x5)/5}}.
> I believe that our Accumulators need to pass more information from the partial aggregations to the final aggregation.
> We could consider taking an API equivalent to [spark]([https://docs.databricks.com/spark/latest/spark-sql/udaf-scala.html]), i.e. have an `update`, a `merge` and an `evaluate`.
> Code with a failing test ({{src/execution/context.rs}})
> {code:java}
>     #[test]
>     fn simple_avg() -> Result<()> {
>         let schema = Schema::new(vec![
>             Field::new("a", DataType::Int32, false),
>         ]);
>         let batch1 = RecordBatch::try_new(
>             Arc::new(schema.clone()),
>             vec![
>                 Arc::new(Int32Array::from(vec![1, 2, 3])),
>             ],
>         )?;
>         let batch2 = RecordBatch::try_new(
>             Arc::new(schema.clone()),
>             vec![
>                 Arc::new(Int32Array::from(vec![4, 5])),
>             ],
>         )?;
>         let mut ctx = ExecutionContext::new();
>         let provider = MemTable::new(Arc::new(schema), vec![vec![batch1], vec![batch2]])?;
>         ctx.register_table("t", Box::new(provider));
>         let result = collect(&mut ctx, "SELECT AVG(a) FROM t")?;
>         let batch = &result[0];
>         assert_eq!(1, batch.num_columns());
>         assert_eq!(1, batch.num_rows());
>         let values = batch
>             .column(0)
>             .as_any()
>             .downcast_ref::<Float64Array>()
>             .expect("failed to cast version");
>         assert_eq!(values.len(), 1);
>         // avg(1,2,3,4,5) = 3.0
>         assert_eq!(values.value(0), 3.0_f64);
>         Ok(())
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)