You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/08/01 21:30:32 UTC

[GitHub] [arrow-datafusion] andygrove opened a new pull request, #3009: WIP: Implement exact median

andygrove opened a new pull request, #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009

   # Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123.
   -->
   
   Closes https://github.com/apache/arrow-datafusion/issues/2925
   
    # Rationale for this change
   <!--
    Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes.  
   -->
   
   Needed for h2o benchmarks.
   
   # What changes are included in this PR?
   <!--
   There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR.
   -->
   
   - Add the ability for accumulators to return either arrays or scalar values
   - Implement new `median` aggregate
   
   # Are there any user-facing changes?
   <!--
   If there are user-facing changes then we may require documentation to be updated before approving the PR.
   -->
   
   Yes, if implementing UDAFs.
   
   <!--
   If there are any breaking changes to public APIs, please add the `api change` label.
   -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1204540607

   Thanks for the review @alamb. I will work on addressing the feedback over the next couple of days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#discussion_r937097331


##########
datafusion/expr/src/accumulator.rs:
##########
@@ -44,3 +44,27 @@ pub trait Accumulator: Send + Sync + Debug {
     /// returns its value based on its current state.
     fn evaluate(&self) -> Result<ScalarValue>;
 }
+
+#[derive(Debug)]
+pub enum AggregateState {
+    Scalar(ScalarValue),
+    Array(ArrayRef),
+}
+
+impl AggregateState {
+    pub fn as_scalar(&self) -> Result<&ScalarValue> {
+        match &self {
+            Self::Scalar(v) => Ok(v),
+            _ => Err(DataFusionError::Execution(
+                "not a scalar aggregate".to_string(),
+            )),

Review Comment:
   ```suggestion
               _ => Err(DataFusionError::Internal(
                   "AggregateState is not a scalar aggregate".to_string(),
               )),
   ```
   
   I think marking this `Internal` might better signal to user if they hit this, they have seen some sort of bug with DataFusion



##########
datafusion/expr/src/accumulator.rs:
##########
@@ -44,3 +44,27 @@ pub trait Accumulator: Send + Sync + Debug {
     /// returns its value based on its current state.
     fn evaluate(&self) -> Result<ScalarValue>;
 }
+
+#[derive(Debug)]
+pub enum AggregateState {

Review Comment:
   This is a very elegant idea. Can you please add docstrings to `AggregateState` explaining what is going on?
   
    I think it would be worth updating the docstrings in the accumulator trait with some discussion / examples of how to use the Array state.
   
   
   



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -206,7 +206,10 @@ impl Accumulator for DistinctCountAccumulator {
             )
         });
 
-        Ok(cols_out)
+        Ok(cols_out
+            .iter()
+            .map(|v| AggregateState::Scalar(v.clone()))
+            .collect())

Review Comment:
   You can probably avoid a copy like:
   
   ```suggestion
           Ok(cols_out
               .into_iter()
               .map(|v| AggregateState::Scalar(v))
               .collect())
   ```



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient

Review Comment:
   I wonder if it is worth (perhaps as a follow on PR) putting a cap on the number of values DataFusion will try to buffer to compute median and throw a runtime error if that number is exceeded 🤔  That way we could avoid OOM kills 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#discussion_r937114037


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient
+/// then APPROX_MEDIAN provides a much more efficient solution.
+#[derive(Debug)]
+pub struct Median {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl Median {
+    /// Create a new MEDIAN aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Median {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(MedianAccumulator {
+            data_type: self.data_type.clone(),
+            all_values: vec![],
+        }))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "median"),
+            self.data_type.clone(),
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct MedianAccumulator {
+    data_type: DataType,
+    all_values: Vec<ArrayRef>,

Review Comment:
   I wonder if you would be better served here by using an ArrayBuilder (though I realize they are strongly typed so it might be more award -- though it is likely faster)



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient
+/// then APPROX_MEDIAN provides a much more efficient solution.
+#[derive(Debug)]
+pub struct Median {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl Median {
+    /// Create a new MEDIAN aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Median {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(MedianAccumulator {
+            data_type: self.data_type.clone(),
+            all_values: vec![],
+        }))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "median"),
+            self.data_type.clone(),
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct MedianAccumulator {
+    data_type: DataType,
+    all_values: Vec<ArrayRef>,
+}
+
+macro_rules! median {
+    ($SELF:ident, $TY:ty, $SCALAR_TY:ident, $TWO:expr) => {{
+        let combined = combine_arrays::<$TY>($SELF.all_values.as_slice())?;
+        if combined.is_empty() {
+            return Ok(ScalarValue::Null);
+        }
+        let sorted = sort(&combined, None)?;
+        let array = sorted
+            .as_any()
+            .downcast_ref::<PrimitiveArray<$TY>>()
+            .ok_or(DataFusionError::Internal(
+                "median! macro failed to cast array to expected type".to_string(),
+            ))?;
+        let len = sorted.len();
+        let mid = len / 2;
+        if len % 2 == 0 {
+            Ok(ScalarValue::$SCALAR_TY(Some(
+                (array.value(mid - 1) + array.value(mid)) / $TWO,
+            )))
+        } else {
+            Ok(ScalarValue::$SCALAR_TY(Some(array.value(mid))))
+        }
+    }};
+}
+
+impl Accumulator for MedianAccumulator {
+    fn state(&self) -> Result<Vec<AggregateState>> {
+        let mut vec: Vec<AggregateState> = self
+            .all_values
+            .iter()
+            .map(|v| AggregateState::Array(v.clone()))
+            .collect();
+        if vec.is_empty() {
+            match self.data_type {

Review Comment:
   Is it correct to produce a single `[0]` element array? Wouldn't that mean that the 0 is now included in the median calculation even though it was not in the original data?



##########
datafusion/physical-expr/src/aggregate/utils.rs:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Utilities used in aggregates

Review Comment:
   👍 



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient
+/// then APPROX_MEDIAN provides a much more efficient solution.
+#[derive(Debug)]
+pub struct Median {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl Median {
+    /// Create a new MEDIAN aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Median {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(MedianAccumulator {
+            data_type: self.data_type.clone(),
+            all_values: vec![],
+        }))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "median"),
+            self.data_type.clone(),
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct MedianAccumulator {
+    data_type: DataType,
+    all_values: Vec<ArrayRef>,
+}
+
+macro_rules! median {
+    ($SELF:ident, $TY:ty, $SCALAR_TY:ident, $TWO:expr) => {{
+        let combined = combine_arrays::<$TY>($SELF.all_values.as_slice())?;
+        if combined.is_empty() {
+            return Ok(ScalarValue::Null);
+        }
+        let sorted = sort(&combined, None)?;
+        let array = sorted
+            .as_any()
+            .downcast_ref::<PrimitiveArray<$TY>>()
+            .ok_or(DataFusionError::Internal(
+                "median! macro failed to cast array to expected type".to_string(),
+            ))?;
+        let len = sorted.len();
+        let mid = len / 2;
+        if len % 2 == 0 {
+            Ok(ScalarValue::$SCALAR_TY(Some(
+                (array.value(mid - 1) + array.value(mid)) / $TWO,
+            )))
+        } else {
+            Ok(ScalarValue::$SCALAR_TY(Some(array.value(mid))))
+        }
+    }};
+}
+
+impl Accumulator for MedianAccumulator {
+    fn state(&self) -> Result<Vec<AggregateState>> {
+        let mut vec: Vec<AggregateState> = self
+            .all_values
+            .iter()
+            .map(|v| AggregateState::Array(v.clone()))
+            .collect();
+        if vec.is_empty() {
+            match self.data_type {
+                DataType::UInt8 => vec.push(AggregateState::Array(Arc::new(
+                    UInt8Array::from_value(0_u8, 0),
+                ))),
+                DataType::UInt16 => vec.push(AggregateState::Array(Arc::new(
+                    UInt16Array::from_value(0_u16, 0),
+                ))),
+                DataType::UInt32 => vec.push(AggregateState::Array(Arc::new(
+                    UInt32Array::from_value(0_u32, 0),
+                ))),
+                DataType::UInt64 => vec.push(AggregateState::Array(Arc::new(
+                    UInt64Array::from_value(0_u64, 0),
+                ))),
+                DataType::Int8 => vec.push(AggregateState::Array(Arc::new(
+                    Int8Array::from_value(0_i8, 0),
+                ))),
+                DataType::Int16 => vec.push(AggregateState::Array(Arc::new(
+                    Int16Array::from_value(0_i16, 0),
+                ))),
+                DataType::Int32 => vec.push(AggregateState::Array(Arc::new(
+                    Int32Array::from_value(0_i32, 0),
+                ))),
+                DataType::Int64 => vec.push(AggregateState::Array(Arc::new(
+                    Int64Array::from_value(0_i64, 0),
+                ))),
+                DataType::Float32 => vec.push(AggregateState::Array(Arc::new(
+                    Float32Array::from_value(0_f32, 0),
+                ))),
+                DataType::Float64 => vec.push(AggregateState::Array(Arc::new(
+                    Float64Array::from_value(0_f64, 0),
+                ))),
+                _ => {
+                    return Err(DataFusionError::Execution(
+                        "unsupported data type for median".to_string(),
+                    ))
+                }
+            }
+        }
+        Ok(vec)
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let x = values[0].clone();
+        self.all_values.extend_from_slice(&[x]);
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        for array in states {
+            self.all_values.extend_from_slice(&[array.clone()]);
+        }
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        match self.all_values[0].data_type() {
+            DataType::Int8 => median!(self, arrow::datatypes::Int8Type, Int8, 2),
+            DataType::Int16 => median!(self, arrow::datatypes::Int16Type, Int16, 2),
+            DataType::Int32 => median!(self, arrow::datatypes::Int32Type, Int32, 2),
+            DataType::Int64 => median!(self, arrow::datatypes::Int64Type, Int64, 2),
+            DataType::UInt8 => median!(self, arrow::datatypes::UInt8Type, UInt8, 2),
+            DataType::UInt16 => median!(self, arrow::datatypes::UInt16Type, UInt16, 2),
+            DataType::UInt32 => median!(self, arrow::datatypes::UInt32Type, UInt32, 2),
+            DataType::UInt64 => median!(self, arrow::datatypes::UInt64Type, UInt64, 2),
+            DataType::Float32 => {
+                median!(self, arrow::datatypes::Float32Type, Float32, 2_f32)
+            }
+            DataType::Float64 => {
+                median!(self, arrow::datatypes::Float64Type, Float64, 2_f64)
+            }
+            _ => Err(DataFusionError::Execution(
+                "unsupported data type for median".to_string(),
+            )),
+        }
+    }
+}
+
+/// Combine all non-null values from provided arrays into a single array
+fn combine_arrays<T: ArrowPrimitiveType>(arrays: &[ArrayRef]) -> Result<ArrayRef> {

Review Comment:
   You might be able to do this with `concat` and `take` as well
   
   Untested
   ```rust
   let final_array = concat(arrays);
   let indexes = final_array.iter().enumerate().filter_map(|(i, v)| v.map(|_| i)).collect();
   take(final_array, indexes)
   ```



##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient
+/// then APPROX_MEDIAN provides a much more efficient solution.
+#[derive(Debug)]
+pub struct Median {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl Median {
+    /// Create a new MEDIAN aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Median {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(MedianAccumulator {
+            data_type: self.data_type.clone(),
+            all_values: vec![],
+        }))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "median"),
+            self.data_type.clone(),
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct MedianAccumulator {
+    data_type: DataType,
+    all_values: Vec<ArrayRef>,
+}
+
+macro_rules! median {
+    ($SELF:ident, $TY:ty, $SCALAR_TY:ident, $TWO:expr) => {{
+        let combined = combine_arrays::<$TY>($SELF.all_values.as_slice())?;
+        if combined.is_empty() {
+            return Ok(ScalarValue::Null);
+        }
+        let sorted = sort(&combined, None)?;
+        let array = sorted
+            .as_any()
+            .downcast_ref::<PrimitiveArray<$TY>>()
+            .ok_or(DataFusionError::Internal(
+                "median! macro failed to cast array to expected type".to_string(),
+            ))?;
+        let len = sorted.len();
+        let mid = len / 2;
+        if len % 2 == 0 {
+            Ok(ScalarValue::$SCALAR_TY(Some(
+                (array.value(mid - 1) + array.value(mid)) / $TWO,
+            )))
+        } else {
+            Ok(ScalarValue::$SCALAR_TY(Some(array.value(mid))))
+        }
+    }};
+}
+
+impl Accumulator for MedianAccumulator {
+    fn state(&self) -> Result<Vec<AggregateState>> {
+        let mut vec: Vec<AggregateState> = self
+            .all_values
+            .iter()
+            .map(|v| AggregateState::Array(v.clone()))
+            .collect();
+        if vec.is_empty() {
+            match self.data_type {
+                DataType::UInt8 => vec.push(AggregateState::Array(Arc::new(
+                    UInt8Array::from_value(0_u8, 0),
+                ))),
+                DataType::UInt16 => vec.push(AggregateState::Array(Arc::new(
+                    UInt16Array::from_value(0_u16, 0),
+                ))),
+                DataType::UInt32 => vec.push(AggregateState::Array(Arc::new(
+                    UInt32Array::from_value(0_u32, 0),
+                ))),
+                DataType::UInt64 => vec.push(AggregateState::Array(Arc::new(
+                    UInt64Array::from_value(0_u64, 0),
+                ))),
+                DataType::Int8 => vec.push(AggregateState::Array(Arc::new(
+                    Int8Array::from_value(0_i8, 0),
+                ))),
+                DataType::Int16 => vec.push(AggregateState::Array(Arc::new(
+                    Int16Array::from_value(0_i16, 0),
+                ))),
+                DataType::Int32 => vec.push(AggregateState::Array(Arc::new(
+                    Int32Array::from_value(0_i32, 0),
+                ))),
+                DataType::Int64 => vec.push(AggregateState::Array(Arc::new(
+                    Int64Array::from_value(0_i64, 0),
+                ))),
+                DataType::Float32 => vec.push(AggregateState::Array(Arc::new(
+                    Float32Array::from_value(0_f32, 0),
+                ))),
+                DataType::Float64 => vec.push(AggregateState::Array(Arc::new(
+                    Float64Array::from_value(0_f64, 0),
+                ))),
+                _ => {
+                    return Err(DataFusionError::Execution(
+                        "unsupported data type for median".to_string(),
+                    ))
+                }
+            }
+        }
+        Ok(vec)
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let x = values[0].clone();
+        self.all_values.extend_from_slice(&[x]);
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        for array in states {
+            self.all_values.extend_from_slice(&[array.clone()]);
+        }
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        match self.all_values[0].data_type() {
+            DataType::Int8 => median!(self, arrow::datatypes::Int8Type, Int8, 2),

Review Comment:
   Instead of using a macro here, I wonder if you could use the `concat` and  `take` kernels 
   
   https://docs.rs/arrow/19.0.0/arrow/compute/kernels/concat/index.html
   https://docs.rs/arrow/19.0.0/arrow/compute/kernels/take/index.html
   
   Something like (untested):
   
   ```rust
   let sorted = sort(concat(&self.all_values));
   let len = sorted.len();
   let mid = len / 2;
   if len % 2 == 0 {
     let indexes: UInt64Array = [mid-1, mid].into_iter().collect();
     // 🤔  Not sure how to do an average:
     let values = average(take(sorted, indexes)) 
     ScalarValue::try_from_array(values, 0)
   } else {
     ScalarValue::try_from_array(sorted, mid)
   } 
   
   ```
   But the need for an `average` stymies that - though I guess we could implement an `average` kernel in datafusion and then put it back into arrow



##########
datafusion/core/tests/sql/aggregates.rs:
##########
@@ -221,7 +221,7 @@ async fn csv_query_stddev_6() -> Result<()> {
 }
 
 #[tokio::test]
-async fn csv_query_median_1() -> Result<()> {

Review Comment:
   If possible, I would recommend adding a basic test in sql for a median for all the different data types that are supported  (not just on aggregate_test_100 but a dedicated test setup with known data (maybe integers 10, 9, 8, ... 0) 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1205373816

   > I suspect this approach will be much higher performance
   
   @alamb It isn't clear to me which approach you are referring to here. I assume you are saying that the approach in this PR of using Array rather than ScalarList::Vec is likely more performant?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1206443236

   @alamb I think this is ready for another look. I added tests and filed a couple of follow-on issues:
   
   - https://github.com/apache/arrow-datafusion/issues/3039
   - https://github.com/apache/arrow-datafusion/issues/3040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#discussion_r937173972


##########
datafusion/physical-expr/src/aggregate/median.rs:
##########
@@ -0,0 +1,260 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! # Median
+
+use crate::expressions::format_state_name;
+use crate::{AggregateExpr, PhysicalExpr};
+use arrow::array::{
+    Array, ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
+    Int8Array, PrimitiveArray, PrimitiveBuilder, UInt16Array, UInt32Array, UInt64Array,
+    UInt8Array,
+};
+use arrow::compute::sort;
+use arrow::datatypes::{ArrowPrimitiveType, DataType, Field};
+use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_expr::{Accumulator, AggregateState};
+use std::any::Any;
+use std::sync::Arc;
+
+/// MEDIAN aggregate expression. This uses a lot of memory because all values need to be
+/// stored in memory before a result can be computed. If an approximation is sufficient
+/// then APPROX_MEDIAN provides a much more efficient solution.
+#[derive(Debug)]
+pub struct Median {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+impl Median {
+    /// Create a new MEDIAN aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Median {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(MedianAccumulator {
+            data_type: self.data_type.clone(),
+            all_values: vec![],
+        }))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "median"),
+            self.data_type.clone(),
+            true,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct MedianAccumulator {
+    data_type: DataType,
+    all_values: Vec<ArrayRef>,
+}
+
+macro_rules! median {
+    ($SELF:ident, $TY:ty, $SCALAR_TY:ident, $TWO:expr) => {{
+        let combined = combine_arrays::<$TY>($SELF.all_values.as_slice())?;
+        if combined.is_empty() {
+            return Ok(ScalarValue::Null);
+        }
+        let sorted = sort(&combined, None)?;
+        let array = sorted
+            .as_any()
+            .downcast_ref::<PrimitiveArray<$TY>>()
+            .ok_or(DataFusionError::Internal(
+                "median! macro failed to cast array to expected type".to_string(),
+            ))?;
+        let len = sorted.len();
+        let mid = len / 2;
+        if len % 2 == 0 {
+            Ok(ScalarValue::$SCALAR_TY(Some(
+                (array.value(mid - 1) + array.value(mid)) / $TWO,
+            )))
+        } else {
+            Ok(ScalarValue::$SCALAR_TY(Some(array.value(mid))))
+        }
+    }};
+}
+
+impl Accumulator for MedianAccumulator {
+    fn state(&self) -> Result<Vec<AggregateState>> {
+        let mut vec: Vec<AggregateState> = self
+            .all_values
+            .iter()
+            .map(|v| AggregateState::Array(v.clone()))
+            .collect();
+        if vec.is_empty() {
+            match self.data_type {

Review Comment:
   These arrays have length 0.  I just pushed a refactor to clean this up and make it more obvious.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1205368676

   > For the `median` state store, a `Map<value, value_occurence_count>` might be more likely to be space efficient. Though it may require more computations than the current approach, and likely not work with arrow compute kernels.
   
   Clever idea. I'm not sure that would work for floating-point types?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1205742142

   > @alamb It isn't clear to me which approach you are referring to here. I assume you are saying that the approach in this PR of using Array rather than ScalarList::Vec is likely more performant?
   
   I guess I was saying that making lots of small `Arrays` and concatenating them together may not be all that much more performant than a Scalar::List but I haven't measured it.
   
   > Clever idea. I'm not sure that would work for floating-point types?
   
   It would probably work but likely take up more space 😆  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] codecov-commenter commented on pull request #3009: WIP: Implement exact median

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1201795012

   # [Codecov](https://codecov.io/gh/apache/arrow-datafusion/pull/3009?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#3009](https://codecov.io/gh/apache/arrow-datafusion/pull/3009?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a18567a) into [master](https://codecov.io/gh/apache/arrow-datafusion/commit/c7fa789e85025a631ed634881e60c1ed71e8d269?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (c7fa789) will **decrease** coverage by `0.01%`.
   > The diff coverage is `74.73%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master    #3009      +/-   ##
   ==========================================
   - Coverage   85.81%   85.79%   -0.02%     
   ==========================================
     Files         282      285       +3     
     Lines       51531    51769     +238     
   ==========================================
   + Hits        44219    44415     +196     
   - Misses       7312     7354      +42     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/arrow-datafusion/pull/3009?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../physical-expr/src/aggregate/array\_agg\_distinct.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvYXJyYXlfYWdnX2Rpc3RpbmN0LnJz) | `80.18% <0.00%> (ø)` | |
   | [datafusion/physical-expr/src/aggregate/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvbW9kLnJz) | `25.00% <ø> (ø)` | |
   | [datafusion/physical-expr/src/expressions/mod.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9leHByZXNzaW9ucy9tb2QucnM=) | `100.00% <ø> (ø)` | |
   | [datafusion/proto/src/lib.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9wcm90by9zcmMvbGliLnJz) | `93.47% <0.00%> (ø)` | |
   | [datafusion/proto/src/to\_proto.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9wcm90by9zcmMvdG9fcHJvdG8ucnM=) | `53.21% <ø> (ø)` | |
   | [datafusion/physical-expr/src/aggregate/median.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvbWVkaWFuLnJz) | `57.95% <57.95%> (ø)` | |
   | [datafusion/physical-expr/src/aggregate/build\_in.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvYnVpbGRfaW4ucnM=) | `89.92% <66.66%> (-0.26%)` | :arrow_down: |
   | [datafusion/expr/src/accumulator.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9leHByL3NyYy9hY2N1bXVsYXRvci5ycw==) | `77.77% <77.77%> (ø)` | |
   | [...sion/physical-expr/src/aggregate/count\_distinct.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvY291bnRfZGlzdGluY3QucnM=) | `94.18% <83.33%> (-0.86%)` | :arrow_down: |
   | [...fusion/physical-expr/src/aggregate/sum\_distinct.rs](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZGF0YWZ1c2lvbi9waHlzaWNhbC1leHByL3NyYy9hZ2dyZWdhdGUvc3VtX2Rpc3RpbmN0LnJz) | `92.10% <85.71%> (-0.63%)` | :arrow_down: |
   | ... and [26 more](https://codecov.io/gh/apache/arrow-datafusion/pull/3009/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   Help us with your feedback. Take ten seconds to tell us [how you rate us](https://about.codecov.io/nps?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] ursabot commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
ursabot commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1206812215

   Benchmark runs are scheduled for baseline = 581934d73dfca7b99e6cc66767b3af3bbad7755f and contender = 245def05940b1fea69d1b75df8a928efb39fc3af. 245def05940b1fea69d1b75df8a928efb39fc3af is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
   Conbench compare runs links:
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] [ec2-t3-xlarge-us-east-2](https://conbench.ursa.dev/compare/runs/c20a38a33061413b98cd84ce6921ab7e...7ff4eaadcea1479a916e56b75ef333b1/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] [test-mac-arm](https://conbench.ursa.dev/compare/runs/b3323a029eef4f068bee88dc23396044...07be473bc7ac408dba377a0338a5bc1d/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] [ursa-i9-9960x](https://conbench.ursa.dev/compare/runs/5d09b95bcec743e2a1b1ef1f9bd99686...782d03f56167447e88163260b514ec0b/)
   [Skipped :warning: Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] [ursa-thinkcentre-m75q](https://conbench.ursa.dev/compare/runs/4204e99761574c218b690084c29b18fe...5402686ffd454b20a9cf97f6a43d8f74/)
   Buildkite builds:
   Supported benchmarks:
   ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
   test-mac-arm: Supported benchmark langs: C++, Python, R
   ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
   ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove merged pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove merged PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] yjshen commented on pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
yjshen commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1204699701

   The main concern that departure `row_aggregate` from `aggregate` comes from the intention to do in-place updates for row-based states. I assume we could store pointers in `RowLayout::WordAligned` for varlena states during accumulation, finalizing and inlining them into the row-state while we are spilling later. UDAFs with`Box<dyn Any>` state requires more, perhaps an extra serde provided.
   
   For the `median` state store, a `Map<value, value_occurence_count>` might be more likely to be space efficient. Though it may require more computations than the current approach, and likely not work with arrow compute kernels.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] andygrove commented on a diff in pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
andygrove commented on code in PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#discussion_r938884531


##########
datafusion/core/tests/sql/aggregates.rs:
##########
@@ -221,7 +221,7 @@ async fn csv_query_stddev_6() -> Result<()> {
 }
 
 #[tokio::test]
-async fn csv_query_median_1() -> Result<()> {

Review Comment:
   Added in https://github.com/apache/arrow-datafusion/pull/3009/commits/ef1effdbfcafb9147184aa59dfb52ffcbe7a572d



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #3009: Implement exact median, add `AggregateState`

Posted by GitBox <gi...@apache.org>.
alamb commented on code in PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#discussion_r939155682


##########
datafusion/core/tests/sql/aggregates.rs:
##########
@@ -286,6 +286,156 @@ async fn csv_query_median_3() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn median_i8() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int8,
+        Arc::new(Int8Array::from(vec![i8::MIN, i8::MIN, 100, i8::MAX])),
+        "-14",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i16() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int16,
+        Arc::new(Int16Array::from(vec![i16::MIN, i16::MIN, 100, i16::MAX])),
+        "-16334",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i32() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int32,
+        Arc::new(Int32Array::from(vec![i32::MIN, i32::MIN, 100, i32::MAX])),
+        "-1073741774",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i64() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int64,
+        Arc::new(Int64Array::from(vec![i64::MIN, i64::MIN, 100, i64::MAX])),
+        "-4611686018427388000",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_u8() -> Result<()> {

Review Comment:
   nice



##########
datafusion/core/tests/sql/mod.rs:
##########
@@ -127,7 +127,13 @@ where
                 l.as_ref().parse::<f64>().unwrap(),
                 r.as_str().parse::<f64>().unwrap(),
             );
-            assert!((l - r).abs() <= 2.0 * f64::EPSILON);
+            if l.is_nan() || r.is_nan() {

Review Comment:
   👍 



##########
datafusion/core/tests/sql/aggregates.rs:
##########
@@ -286,6 +286,156 @@ async fn csv_query_median_3() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn median_i8() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int8,
+        Arc::new(Int8Array::from(vec![i8::MIN, i8::MIN, 100, i8::MAX])),
+        "-14",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i16() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int16,
+        Arc::new(Int16Array::from(vec![i16::MIN, i16::MIN, 100, i16::MAX])),
+        "-16334",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i32() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int32,
+        Arc::new(Int32Array::from(vec![i32::MIN, i32::MIN, 100, i32::MAX])),
+        "-1073741774",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_i64() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Int64,
+        Arc::new(Int64Array::from(vec![i64::MIN, i64::MIN, 100, i64::MAX])),
+        "-4611686018427388000",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_u8() -> Result<()> {
+    median_test(
+        "median",
+        DataType::UInt8,
+        Arc::new(UInt8Array::from(vec![u8::MIN, u8::MIN, 100, u8::MAX])),
+        "50",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_u16() -> Result<()> {
+    median_test(
+        "median",
+        DataType::UInt16,
+        Arc::new(UInt16Array::from(vec![u16::MIN, u16::MIN, 100, u16::MAX])),
+        "50",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_u32() -> Result<()> {
+    median_test(
+        "median",
+        DataType::UInt32,
+        Arc::new(UInt32Array::from(vec![u32::MIN, u32::MIN, 100, u32::MAX])),
+        "50",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_u64() -> Result<()> {
+    median_test(
+        "median",
+        DataType::UInt64,
+        Arc::new(UInt64Array::from(vec![u64::MIN, u64::MIN, 100, u64::MAX])),
+        "50",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_f32() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Float32,
+        Arc::new(Float32Array::from(vec![1.1, 4.4, 5.5, 3.3, 2.2])),
+        "3.3",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_f64() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Float64,
+        Arc::new(Float64Array::from(vec![1.1, 4.4, 5.5, 3.3, 2.2])),
+        "3.3",
+    )
+    .await
+}
+
+#[tokio::test]
+async fn median_f64_nan() -> Result<()> {
+    median_test(
+        "median",
+        DataType::Float64,
+        Arc::new(Float64Array::from(vec![1.1, f64::NAN, f64::NAN, f64::NAN])),
+        "NaN", // probably not the desired behavior? - see https://github.com/apache/arrow-datafusion/issues/3039
+    )
+    .await
+}
+
+#[tokio::test]
+async fn approx_median_f64_nan() -> Result<()> {
+    median_test(
+        "approx_median",
+        DataType::Float64,
+        Arc::new(Float64Array::from(vec![1.1, f64::NAN, f64::NAN, f64::NAN])),
+        "NaN", // probably not the desired behavior? - see https://github.com/apache/arrow-datafusion/issues/3039

Review Comment:
   testing for the win!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #3009: Implement exact median

Posted by GitBox <gi...@apache.org>.
alamb commented on PR #3009:
URL: https://github.com/apache/arrow-datafusion/pull/3009#issuecomment-1203916530

   I plan to review this carefully later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org