You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2022/01/06 22:31:54 UTC

[GitHub] [arrow-datafusion] realno opened a new pull request #1525: Add stddev operator

realno opened a new pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525


   # Which issue does this PR close?
   
   This PR covers one of the functions (stddev) in #1486 
   
    # Rationale for this change
   
   This change is to add functions to calculate standard deviation and variance to DataFusion.
   
   # What changes are included in this PR?
   
   Two aggregation functions `variance` and `stddev` are added to DataFusion. 
   Also added some arithmetic functions (add, mul and div) to `ScalarValue` since they are likely to be commonly used by many other operators. There are limitations to the current form of the functions - please see the code. It may be worth creating a new issue to capture the work properly.
   
   # Are there any user-facing changes?
   
   No change to existing functionalities or APIs. Two new functions are added and are available through SQL interface.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add `stddev` and `variance`

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009567186


   FYI @alamb , the PR is ready: https://github.com/apache/arrow-datafusion/pull/1547


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno edited a comment on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno edited a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008174462


   @alamb thanks for the thorough review! After it is merged I will open a followup PR to add `batch_update` and `batch_merge` as discussed.
   
   I want to discuss the aggregator interface a bit further. The `update` function doesn't seem to provide much value with the current architecture, it seems under all circumstances `batch_update` will perform better. Would it be a reasonable choice to just keep `batch_update` (and remove `update`)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if `update` can provide additional value such as lower latency in streaming context. 
   
   With that said I am still new to the code base and could have missed something here. It would be nice to hear more people's opinion. Also please let me know if there is a more appropriate forum to discuss this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780100249



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       Do you have the plan and support the decimal type?
   Now I'm implementing the decimal data type in the datafusion.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780443231



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       Yes, we should support decimal according to spec. I am glad to see you are working on adding decimal - which is the reason I left it out at the moment. Please let me know once it's merged I am happy to open another PR to add it. Once this PR is merged I will create an issue to track this. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780096756



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {

Review comment:
       I think we don't need to public this function and just like `is_stddev_support_arg_type` method.

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       Do we need to public `stddev_return_type` and `Stddev` as a lib function?

##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       in the doc of PG https://www.postgresql.org/docs/9.1/functions-aggregate.html
   The returned type include the double floating-point and numeric/decimal

##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       Do you have the plan and support the decimal type?
   Now I'm implementing the decimal data type in the datafusion.

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));

Review comment:
       Same comments about decimal type like stddev function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780582709



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       Completely agree. My feeling is these basic operations could be implemented in ScalarValue since many operators will depend on them. I want to pick your brain (and other contributor's too) if you think this is a good idea - this deserves a seperate discussion. Also with the way ScalarValue is implemented currently each operator needs a high match block, maybe using generics can make it a little easier? 
   
   Here are some options I propose:
   1. Remove the redundency, I can either move sum to Scalavalue or the other direction is fine. Personally I prefer to have it in ScalarValue.
   2. Leave the code as is until we have a discussion/conclusion around how to handle arithmetic operation for ScalarValue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009369485


   > > Perhaps we could add some comments to update clarifying that for aggregators should prefer to implement batch_update and update is present only for backwards compatibility / ease of use?
   
   > I see, that makes sense. It is a good idea to add some comments.
   
   I proposed some doc clarification in https://github.com/apache/arrow-datafusion/pull/1542


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780335960



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -212,6 +222,26 @@ pub fn create_aggregate_expr(
                 "AVG(DISTINCT) aggregations are not available".to_string(),
             ));
         }
+        (AggregateFunction::Variance, false) => Arc::new(expressions::Variance::new(
+            coerced_phy_exprs[0].clone(),
+            name,
+            return_type,
+        )),
+        (AggregateFunction::Variance, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "VARIANCE(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::Stddev, false) => Arc::new(expressions::Stddev::new(
+            coerced_phy_exprs[0].clone(),
+            name,
+            return_type,
+        )),
+        (AggregateFunction::Stddev, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "VARIANCE(DISTINCT) aggregations are not available".to_string(),

Review comment:
       ```suggestion
                   "STDDEV(DISTINCT) aggregations are not available".to_string(),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno edited a comment on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno edited a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008174462


   @alamb thanks for the thorough review! I will open a followup PR to add `batch_update` and `batch_merge` as discussed.
   
   I want to discuss the aggregator interface a bit further. The `update` function doesn't seem to provide much value with the current architecture, it seems under all circumstances `batch_update` will perform better. Would it be a reasonable choice to just keep `batch_update` (and remove `update`)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if `update` can provide additional value such as lower latency in streaming context. 
   
   With that said I am still new to the code base and could have missed something here. It would be nice to hear more people's opinion. Also please let me know if there is a more appropriate forum to discuss this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008174462


   @alamb thanks for the through review! I will open a followup PR to add `batch_update` and `batch_merge` as discussed.
   
   I want to discuss the aggregator interface a bit further. The `update` function doesn't seem to provide much value with the current architecture, it seems under all circumstances `batch_update` will perform better. Would it be a reasonable choice to just keep `batch_update` (and remove `update`)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if `update` can provide additional value such as lower latency in streaming context. 
   
   With that said I am still new to the code base and could have missed something here. It would be nice to hear more people's opinion. Also please let me know if there is a more appropriate forum to discuss this.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780628753



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       First of all, thanks for suggesting this approach, I was able to get it working after making `evaluate_to_scalar` to `pub(crate)`, it will reduce the amount of code significantly. 
   
   Though after some thinking I am leaning towards having a separate discussion and make the change in a different PR. Here are some of my thoughts:
   1. It requires changing the scope of an internal function
   2. As you mentioned, there are quite a bit of performance overhead
   3. I think ScalarValue arithmetic is important to many future operators, we can take the chance to provide official support. It feels right to formally discuss the topic and review if this is the best way to achieve it. 
   4. This method doesn't support uptype when overflow happens. This is useful because for aggregation functions it is quite common the results requires more accuracy than the original type. 
   5. Some other functions such as `sum` and `avg` also have similar implementations. Though this is not a good reason to keep as is but it shows the value of official Scalar arithmetic support.
   
   Please let me know your thoughts. If makes sense I am happy to create a new issue for the work and join the discussion. Thanks! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008776459


   Unless anyone has another other thoughts on this PR I'll plan to merge it later today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780591785



##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780582709



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       Completely agree. My feeling is these basic operations could be implemented in ScalarValue since many operators will depend on them. I want to pick your brain (and other contributor's too) if you think this is a good idea - this deserves a seperate discussion. Also with the way ScalarValue is implemented currently each operator needs a huge match block, maybe using generics can make it a little easier? 
   
   Here are some options I propose:
   1. Remove the redundency, I can either move sum to Scalavalue or the other direction is fine. Personally I prefer to have it in ScalarValue.
   2. Leave the code as is until we have a discussion/conclusion around how to handle arithmetic operation for ScalarValue. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780099721



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       in the doc of PG https://www.postgresql.org/docs/9.1/functions-aggregate.html
   The returned type include the double floating-point and numeric/decimal




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780589214



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The different value from postgres and google is because of the nature of stddev, there are two versions: 1. population and 2. sample. The one in this PR is population, looks like the default one with postgres and google is sample. The difference in the calculation is very minimal I can include the sampe version in the PR as well. Good catch! 
   
   So my proposal is to add two functions: stddev_pop and stddev_s, does this look reasonable? @alamb 
   
   And I will look into the inconsistency between query runs. Thanks! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780707255



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       Sorry let me try to explain it a bit better.
   
   The online algorithm uses an intermediate value for each record called m2. Here is how m2 is calculated:
   `m2(n) = m2(n-1) + (delta1*delta2)`
   So the calculation of m2(n) depends on the value of m2(n-1). And it is similar for delta1 and delta2 - they depends on mean(n-1). i.e. the algorithm is iterative. So what I meant was this algorithm can't benefit from complete vectorization - for each batch it has to be iterative. 
   
   But I see your point that we can avoid per-record type conversion and matching since in `batch_update` we can use the raw array. I think I can make the change in this PR. Thanks for pointing it out.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780732423



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       An issue is created to capture the work. https://github.com/apache/arrow-datafusion/issues/1529




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780589214



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The different value from postgres and google is because of the nature of stddev, there are two versions: 1. population and 2. sample. The one in this PR is population, looks like the default one with postgres and google is sample. The difference in the calculation is very minimal I can include the sampe version in the PR as well. Good catch! 
   
   So my proposal is to add two functions: stddev_pop and stddev_samp (following Postgres standard), and have stddev default to stddev_samp. Does this look reasonable? @alamb 
   
   And I will look into the inconsistency between query runs. Thanks! 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007822391


   > This is a really nice piece of work @realno - thank you so much ❤️
   > 
   > I especially like the thorough testing.
   > 
   > I did not review the referenced papers, but I did run some basic tests against postgres and I got some strange results
   > 
   > ```shell
   > cargo run datafusion-cli
   > ```
   > 
   > Sometimes the answers are different from query to query
   > 
   > ```
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | 0.7760297817881877 |
   > +--------------------+
   > 1 row in set. Query took 0.008 seconds.
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | NaN                |
   > +--------------------+
   > ```
   > 
   > And neither of those answers matches postgres:
   > 
   > ```
   > alamb=# select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   >          stddev         
   > ------------------------
   >  0.95043849529221686157
   > (1 row)
   > ```
   > 
   > Postgres and google sheets do match <img alt="Screen Shot 2022-01-07 at 4 15 53 PM" width="368" src="https://user-images.githubusercontent.com/490673/148608489-9e64e81c-a627-4991-b3b5-079f19030c16.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007897342


   @alamb All the comments are addressed, there are some questions/discussion please take a look when you have time. Otherwise the PR is ready.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780103988



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));

Review comment:
       Same comments about decimal type like stddev function.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008307171


   > I want to discuss the aggregator interface a bit further. The update function doesn't seem to provide much value with the current architecture, it seems under all circumstances batch_update will perform better. Would it be a reasonable choice to just keep batch_update (and remove update)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if update can provide additional value such as lower latency in streaming context.
   
   I another other rationale for `update` is to lower the barrier to entry for new users to write `Accumulator`s  (otherwise you have to understand Arrow arrays and compute kernels, etc to write one), as well as being backwards compatible with the original interface
   
   Perhaps we could add some comments to `update` clarifying that for aggregators should prefer to implement `batch_update` and `update` is present only for backwards compatibility / ease of use?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780096756



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {

Review comment:
       I think we don't need to public this function and just like `is_stddev_support_arg_type` method.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780097748



##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       Do we need to public `stddev_return_type` and `Stddev` as a lib function?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780582296



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       Completely agree. I have plan to investigate this as followup PR. The current challenge is the algorithm will loose parallelizability if using a batch friendly algorithm. And I need to spend more time to understand the code. One question I have is will there be a chance `update` and `batch_update` can be used in the same job, i.e. if one job can call `update` on some data and `batch_update` on some other data. Reason for that is the online version of the algorithm requires an intermediate value to be calculated so it is not compatible with batch mode, that is, we can only do all batch or all online. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb merged pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb merged pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno edited a comment on pull request #1525: Add `stddev` and `variance`

Posted by GitBox <gi...@apache.org>.
realno edited a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009549122


   > Thanks again @realno -- would you like me to file a ticket for the the follow on work?
   > 
   > (specifically removing `ScalarValue::add` etc and implementing `update_batch` instead?
   
   I have the changes ready, will open a PR today. If you prefer to have a ticket as reference I can create it too.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780639143



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The inconsistency between query runs are now fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb edited a comment on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb edited a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009352178


   Thanks again @realno  -- would you like me to file a ticket for the the follow on work?
   
   (specifically removing `ScalarValue::add` etc and implementing `update_batch` instead?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno removed a comment on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno removed a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007822391


   > This is a really nice piece of work @realno - thank you so much ❤️
   > 
   > I especially like the thorough testing.
   > 
   > I did not review the referenced papers, but I did run some basic tests against postgres and I got some strange results
   > 
   > ```shell
   > cargo run datafusion-cli
   > ```
   > 
   > Sometimes the answers are different from query to query
   > 
   > ```
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | 0.7760297817881877 |
   > +--------------------+
   > 1 row in set. Query took 0.008 seconds.
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | NaN                |
   > +--------------------+
   > ```
   > 
   > And neither of those answers matches postgres:
   > 
   > ```
   > alamb=# select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   >          stddev         
   > ------------------------
   >  0.95043849529221686157
   > (1 row)
   > ```
   > 
   > Postgres and google sheets do match <img alt="Screen Shot 2022-01-07 at 4 15 53 PM" width="368" src="https://user-images.githubusercontent.com/490673/148608489-9e64e81c-a627-4991-b3b5-079f19030c16.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno closed pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno closed pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007822391






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780660105



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       I don't fully understand the discussion here about  online and parallelism with respect to `update` and `update_batch`
   
   `update_batch` is actually what the HashAggregator calls  during execution. The default implementation of `update_batch` simply iterates over each row of the input array, converting it to a `ScalarValue`, and calls `update` convert the  https://github.com/apache/arrow-datafusion/blob/415c5e124af18a05500514f78604366d860dcf5a/datafusion/src/physical_plan/mod.rs#L572-L583
   
   My mental picture of aggregation is like this:
   
   ```
   ┌───────────────────────┐             ┌──────────────────────┐                                       
   │                       │             │                      │                                       
   │  Input RecordBatch 0  │ update_batch│     Accumulator      │                                       
   │                       │────────────▶│                      │───────┐                               
   │                       │             │                      │       │                               
   └───────────────────────┘             └──────────────────────┘       │                               
                                                                        │                               
                                                                        │     merge                     
                                                                        │                               
   ┌───────────────────────┐             ┌──────────────────────┐       │       ┌──────────────────────┐
   │                       │             │                      │       │       │                      │
   │  Input RecordBatch 1  │ update_batch│     Accumulator      │       │       │     Accumulator      │
   │                       │────────────▶│                      │───────┼──────▶│                      │
   │                       │             │                      │       │       │                      │
   └───────────────────────┘             └──────────────────────┘       │       └──────────────────────┘
                                                                        │                               
                                                                        │                               
                                                                        │                               
                                                                        │                               
              ...                                   ...                 │                               
                                                                        │                               
                                                                        │                               
                                                                        │                               
   ┌───────────────────────┐             ┌──────────────────────┐       │                               
   │                       │             │                      │       │                               
   │  Input RecordBatch N  │ update_batch│     Accumulator      │       │                               
   │                       │────────────▶│                      │───────┘                               
   │                       │             │                      │                                       
   └───────────────────────┘             └──────────────────────┘                                       
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       > I think ScalarValue arithmetic is important to many future operators,
   
   I think that we should be trying to avoid `ScalarValue` arithmetic if at all possible, and instead use "vectorized" calculations (aka computations using arrow arrays). While this coding style is more cumbersome it is one of the key advantages for datafusion (and columnar execution engines in general)
   
   The reason that `ScalarValue` arithmetic is so slow is that for each row, there is overhead to switch based on the type (aka the large `match` statements required in this PR). Using `Array`s does this type switch once per array so you only pay the cost once for 1000s of rows. 
   
   I think having a separate discussion / PR is a good. 
   
   That being said, I agree that `ScalarValue::sum()` makes more sense that a free `sum` function (as it is easier to find) -- so perhaps as a follow on to this PR we can combine the two implementations




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009352178


   Thanks again @realno  -- would you like me to file a ticket for the the follow on work?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] matthewmturner commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
matthewmturner commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007492724


   Perhaps @Dandandan would be interested in this as he was involved in db-benchmark which this will help


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780444768



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));

Review comment:
       Please see reply from stddev. Will create new PR once decimal support is added.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780516560



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       I think we could use the expression evaluator here rather than writing a giant match statement. 
   
   
   Something like the following 
   
   ```
   
       /// Multiply two numeric ScalarValues
       pub fn mul(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
           let props = ExecutionProps::new();
           let const_evaluator = ConstEvaluator::new(&props);
   
           // Create a LogicalPlan::Expr representing lhs * rhs
           let e = Expr::Literal(lhs.clone()) * Expr::Literal(rhs.clone());
           const_evaluator.evaluate_to_scalar(e)
       }
   ```
   
   Benefits: 
   1. require (much less) code
   2. have built in support for `Decimal` and other nice things
   
   Drawbacks
   1. It might be slower at runtime without some additional special case handling of ScalarValues, but we can do that as a follow on PR
   

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       I think this code is redundant with 
   
   ```
   pub(super) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
   ```
   
   https://github.com/apache/arrow-datafusion/blob/9d3186693b614db57143adbd81c82a60752a8bac/datafusion/src/physical_plan/expressions/sum.rs#L266-L349
   

##########
File path: datafusion/src/scalar.rs
##########
@@ -3081,4 +3357,209 @@ mod tests {
             DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
         );
     }
+
+    macro_rules! test_scalar_op {
+        ($OP:ident, $LHS:expr, $LHS_TYPE:ident, $RHS:expr, $RHS_TYPE:ident, $RESULT:expr, $RESULT_TYPE:ident) => {{
+            let v1 = &ScalarValue::from($LHS as $LHS_TYPE);
+            let v2 = &ScalarValue::from($RHS as $RHS_TYPE);
+            assert_eq!(
+                ScalarValue::$OP(v1, v2).unwrap(),
+                ScalarValue::from($RESULT as $RESULT_TYPE)
+            );
+        }};
+    }
+
+    macro_rules! test_scalar_op_err {
+        ($OP:ident, $LHS:expr, $LHS_TYPE:ident, $RHS:expr, $RHS_TYPE:ident) => {{
+            let v1 = &ScalarValue::from($LHS as $LHS_TYPE);
+            let v2 = &ScalarValue::from($RHS as $RHS_TYPE);
+            let actual = ScalarValue::add(v1, v2).is_err();

Review comment:
       I believe this should be referring to `$OP` rather than `add`:
   
   ```suggestion
               let actual = ScalarValue::$OP(v1, v2).is_err();
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without
+        // writing a hige match block.
+        // TODO: Add support for decimal types
+        match (lhs, rhs) {
+            (ScalarValue::Decimal128(_, _, _), _) |
+            (_, ScalarValue::Decimal128(_, _, _)) => {
+                Err(DataFusionError::Internal(
+                    "Addition with Decimals are not supported for now".to_string()
+                ))
+            },
+            // f64 / _
+            (ScalarValue::Float64(f1), ScalarValue::Float64(f2)) => {
+                Ok(ScalarValue::Float64(Some(f1.unwrap() + f2.unwrap())))

Review comment:
       These `unwrap`s will panic if either argument is `None` (aka represents NULL).

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance

Review comment:
       I suggest bringing the algorithmic reference from the PR description into the code
   
   ```suggestion
   /// An accumulator to compute variance
   ///
   /// The algorithm used is an online implementation and numerically stable. It is based on this paper:
   /// Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products". 
   /// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577.
   ///
   /// It has been analyzed here:
   /// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing Sample Means and Variances".
   /// Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154.
   ```

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};
 pub(crate) use sum::is_sum_support_arg_type;
 pub use sum::{sum_return_type, Sum};
 pub use try_cast::{try_cast, TryCastExpr};
+pub(crate) use variance::is_variance_support_arg_type;
+pub use variance::{variance_return_type, Variance};

Review comment:
       ```suggestion
   pub (crate) use variance::{variance_return_type, Variance};
   ```

##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Stddev {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(StddevAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![

Review comment:
       this is wonderful

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       Put another way, perhaps this could be 
   
   ```suggestion
   pub (crate) use stddev::{stddev_return_type, Stddev};
   ```

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       Using `ScalarValue`s like this to accumulate each operation is likely to be quite slow during runtime. However, I think it would be fine to put in as a first initial implementation and then implement an optimized version using `update_batch` and arrow compute kernels as a follow on PR
   
   ```rust
       /// updates the accumulator's state from a vector of arrays.
       fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] matthewmturner commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
matthewmturner commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007492724


   Perhaps @Dandandan would be interested in this as he was involved in db-benchmark which this will help


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] liukun4515 commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
liukun4515 commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780665339



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       > Yes, we should support decimal according to spec. I am glad to see you are working on adding decimal - which is the reason I left it out at the moment. Please let me know once it's merged I am happy to open another PR to add it. Once this PR is merged I will create an issue to track this.
   
   please open a issue to track this.
   you can add it to #122.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780787572



##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       >  So what I meant was this algorithm can't benefit from complete vectorization - for each batch it has to be iterative.
   
   Ah that makes sense 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1008337842


   > > I want to discuss the aggregator interface a bit further. The update function doesn't seem to provide much value with the current architecture, it seems under all circumstances batch_update will perform better. Would it be a reasonable choice to just keep batch_update (and remove update)? This way we can get rid a lot of code dealing with record level ScalarValue arithmetic. IMO it only makes sense to differentiate the two if update can provide additional value such as lower latency in streaming context.
   > 
   > I another other rationale for `update` is to lower the barrier to entry for new users to write `Accumulator`s (otherwise you have to understand Arrow arrays and compute kernels, etc to write one), as well as being backwards compatible with the original interface
   > 
   > Perhaps we could add some comments to `update` clarifying that for aggregators should prefer to implement `batch_update` and `update` is present only for backwards compatibility / ease of use?
   
   I see, that makes sense. It is a good idea to add some comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on pull request #1525: Add `stddev` and `variance`

Posted by GitBox <gi...@apache.org>.
realno commented on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1009549122


   > Thanks again @realno -- would you like me to file a ticket for the the follow on work?
   > 
   > (specifically removing `ScalarValue::add` etc and implementing `update_batch` instead?
   
   I have the changes ready, will open a PR today. If you prefer to have a ticket as reference I can create it too. :) 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] Dandandan commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
Dandandan commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780335960



##########
File path: datafusion/src/physical_plan/aggregates.rs
##########
@@ -212,6 +222,26 @@ pub fn create_aggregate_expr(
                 "AVG(DISTINCT) aggregations are not available".to_string(),
             ));
         }
+        (AggregateFunction::Variance, false) => Arc::new(expressions::Variance::new(
+            coerced_phy_exprs[0].clone(),
+            name,
+            return_type,
+        )),
+        (AggregateFunction::Variance, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "VARIANCE(DISTINCT) aggregations are not available".to_string(),
+            ));
+        }
+        (AggregateFunction::Stddev, false) => Arc::new(expressions::Stddev::new(
+            coerced_phy_exprs[0].clone(),
+            name,
+            return_type,
+        )),
+        (AggregateFunction::Stddev, true) => {
+            return Err(DataFusionError::NotImplemented(
+                "VARIANCE(DISTINCT) aggregations are not available".to_string(),

Review comment:
       ```suggestion
                   "STDDEV(DISTINCT) aggregations are not available".to_string(),
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780583007



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       Good point, will explore the option.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] alamb commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
alamb commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780516560



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       I think we could use the expression evaluator here rather than writing a giant match statement. 
   
   
   Something like the following 
   
   ```
   
       /// Multiply two numeric ScalarValues
       pub fn mul(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
           let props = ExecutionProps::new();
           let const_evaluator = ConstEvaluator::new(&props);
   
           // Create a LogicalPlan::Expr representing lhs * rhs
           let e = Expr::Literal(lhs.clone()) * Expr::Literal(rhs.clone());
           const_evaluator.evaluate_to_scalar(e)
       }
   ```
   
   Benefits: 
   1. require (much less) code
   2. have built in support for `Decimal` and other nice things
   
   Drawbacks
   1. It might be slower at runtime without some additional special case handling of ScalarValues, but we can do that as a follow on PR
   

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       I think this code is redundant with 
   
   ```
   pub(super) fn sum(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
   ```
   
   https://github.com/apache/arrow-datafusion/blob/9d3186693b614db57143adbd81c82a60752a8bac/datafusion/src/physical_plan/expressions/sum.rs#L266-L349
   

##########
File path: datafusion/src/scalar.rs
##########
@@ -3081,4 +3357,209 @@ mod tests {
             DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned()))
         );
     }
+
+    macro_rules! test_scalar_op {
+        ($OP:ident, $LHS:expr, $LHS_TYPE:ident, $RHS:expr, $RHS_TYPE:ident, $RESULT:expr, $RESULT_TYPE:ident) => {{
+            let v1 = &ScalarValue::from($LHS as $LHS_TYPE);
+            let v2 = &ScalarValue::from($RHS as $RHS_TYPE);
+            assert_eq!(
+                ScalarValue::$OP(v1, v2).unwrap(),
+                ScalarValue::from($RESULT as $RESULT_TYPE)
+            );
+        }};
+    }
+
+    macro_rules! test_scalar_op_err {
+        ($OP:ident, $LHS:expr, $LHS_TYPE:ident, $RHS:expr, $RHS_TYPE:ident) => {{
+            let v1 = &ScalarValue::from($LHS as $LHS_TYPE);
+            let v2 = &ScalarValue::from($RHS as $RHS_TYPE);
+            let actual = ScalarValue::add(v1, v2).is_err();

Review comment:
       I believe this should be referring to `$OP` rather than `add`:
   
   ```suggestion
               let actual = ScalarValue::$OP(v1, v2).is_err();
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without
+        // writing a hige match block.
+        // TODO: Add support for decimal types
+        match (lhs, rhs) {
+            (ScalarValue::Decimal128(_, _, _), _) |
+            (_, ScalarValue::Decimal128(_, _, _)) => {
+                Err(DataFusionError::Internal(
+                    "Addition with Decimals are not supported for now".to_string()
+                ))
+            },
+            // f64 / _
+            (ScalarValue::Float64(f1), ScalarValue::Float64(f2)) => {
+                Ok(ScalarValue::Float64(Some(f1.unwrap() + f2.unwrap())))

Review comment:
       These `unwrap`s will panic if either argument is `None` (aka represents NULL).

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance

Review comment:
       I suggest bringing the algorithmic reference from the PR description into the code
   
   ```suggestion
   /// An accumulator to compute variance
   ///
   /// The algorithm used is an online implementation and numerically stable. It is based on this paper:
   /// Welford, B. P. (1962). "Note on a method for calculating corrected sums of squares and products". 
   /// Technometrics. 4 (3): 419–420. doi:10.2307/1266577. JSTOR 1266577.
   ///
   /// It has been analyzed here:
   /// Ling, Robert F. (1974). "Comparison of Several Algorithms for Computing Sample Means and Variances".
   /// Journal of the American Statistical Association. 69 (348): 859–866. doi:10.2307/2286154. JSTOR 2286154.
   ```

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};
 pub(crate) use sum::is_sum_support_arg_type;
 pub use sum::{sum_return_type, Sum};
 pub use try_cast::{try_cast, TryCastExpr};
+pub(crate) use variance::is_variance_support_arg_type;
+pub use variance::{variance_return_type, Variance};

Review comment:
       ```suggestion
   pub (crate) use variance::{variance_return_type, Variance};
   ```

##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+            data_type,
+        }
+    }
+}
+
+impl AggregateExpr for Stddev {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, self.data_type.clone(), true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(StddevAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![

Review comment:
       this is wonderful

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       Put another way, perhaps this could be 
   
   ```suggestion
   pub (crate) use stddev::{stddev_return_type, Stddev};
   ```

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       Using `ScalarValue`s like this to accumulate each operation is likely to be quite slow during runtime. However, I think it would be fine to put in as a first initial implementation and then implement an optimized version using `update_batch` and arrow compute kernels as a follow on PR
   
   ```rust
       /// updates the accumulator's state from a vector of arrays.
       fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
   ```

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       yes that sounds good. Thank you




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno removed a comment on pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno removed a comment on pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#issuecomment-1007822391


   > This is a really nice piece of work @realno - thank you so much ❤️
   > 
   > I especially like the thorough testing.
   > 
   > I did not review the referenced papers, but I did run some basic tests against postgres and I got some strange results
   > 
   > ```shell
   > cargo run datafusion-cli
   > ```
   > 
   > Sometimes the answers are different from query to query
   > 
   > ```
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | 0.7760297817881877 |
   > +--------------------+
   > 1 row in set. Query took 0.008 seconds.
   > ❯ select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   > +--------------------+
   > | STDDEV(sq.column1) |
   > +--------------------+
   > | NaN                |
   > +--------------------+
   > ```
   > 
   > And neither of those answers matches postgres:
   > 
   > ```
   > alamb=# select stddev(sq.column1) from (values (1.1), (2.0), (3.0)) as sq;
   >          stddev         
   > ------------------------
   >  0.95043849529221686157
   > (1 row)
   > ```
   > 
   > Postgres and google sheets do match <img alt="Screen Shot 2022-01-07 at 4 15 53 PM" width="368" src="https://user-images.githubusercontent.com/490673/148608489-9e64e81c-a627-4991-b3b5-079f19030c16.png">
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780443231



##########
File path: datafusion/src/physical_plan/expressions/stddev.rs
##########
@@ -0,0 +1,312 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{
+    expressions::variance::VarianceAccumulator, Accumulator, AggregateExpr, PhysicalExpr,
+};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// STDDEV (standard deviation) aggregate expression
+#[derive(Debug)]
+pub struct Stddev {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+    data_type: DataType,
+}
+
+/// function return type of standard deviation
+pub fn stddev_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "STDDEV does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_stddev_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Stddev {
+    /// Create a new STDDEV aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of stddev just support FLOAT64 and Decimal data type.

Review comment:
       Yes, we should support decimal according to spec. I am glad to see you are working on adding decimal - which is the reason I left it out at the moment. Please let me know once it's merged I am happy to open another PR to add it. Once this PR is merged I will create an issue to track this. 

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));

Review comment:
       Please see reply from stddev. Will create new PR once decimal support is added.

##########
File path: datafusion/src/physical_plan/expressions/variance.rs
##########
@@ -0,0 +1,376 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Defines physical expressions that can evaluated at runtime during query execution
+
+use std::any::Any;
+use std::sync::Arc;
+
+use crate::error::{DataFusionError, Result};
+use crate::physical_plan::{Accumulator, AggregateExpr, PhysicalExpr};
+use crate::scalar::ScalarValue;
+use arrow::datatypes::DataType;
+use arrow::datatypes::Field;
+
+use super::format_state_name;
+
+/// VARIANCE aggregate expression
+#[derive(Debug)]
+pub struct Variance {
+    name: String,
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+/// function return type of variance
+pub fn variance_return_type(arg_type: &DataType) -> Result<DataType> {
+    match arg_type {
+        DataType::Int8
+        | DataType::Int16
+        | DataType::Int32
+        | DataType::Int64
+        | DataType::UInt8
+        | DataType::UInt16
+        | DataType::UInt32
+        | DataType::UInt64
+        | DataType::Float32
+        | DataType::Float64 => Ok(DataType::Float64),
+        other => Err(DataFusionError::Plan(format!(
+            "VARIANCE does not support {:?}",
+            other
+        ))),
+    }
+}
+
+pub(crate) fn is_variance_support_arg_type(arg_type: &DataType) -> bool {
+    matches!(
+        arg_type,
+        DataType::UInt8
+            | DataType::UInt16
+            | DataType::UInt32
+            | DataType::UInt64
+            | DataType::Int8
+            | DataType::Int16
+            | DataType::Int32
+            | DataType::Int64
+            | DataType::Float32
+            | DataType::Float64
+    )
+}
+
+impl Variance {
+    /// Create a new VARIANCE aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        data_type: DataType,
+    ) -> Self {
+        // the result of variance just support FLOAT64 data type.
+        assert!(matches!(data_type, DataType::Float64));
+        Self {
+            name: name.into(),
+            expr,
+        }
+    }
+}
+
+impl AggregateExpr for Variance {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(&self.name, DataType::Float64, true))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(VarianceAccumulator::try_new()?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![
+            Field::new(
+                &format_state_name(&self.name, "count"),
+                DataType::UInt64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "mean"),
+                DataType::Float64,
+                true,
+            ),
+            Field::new(
+                &format_state_name(&self.name, "m2"),
+                DataType::Float64,
+                true,
+            ),
+        ])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+/// An accumulator to compute variance
+#[derive(Debug)]
+pub struct VarianceAccumulator {
+    m2: ScalarValue,
+    mean: ScalarValue,
+    count: u64,
+}
+
+impl VarianceAccumulator {
+    /// Creates a new `VarianceAccumulator`
+    pub fn try_new() -> Result<Self> {
+        Ok(Self {
+            m2: ScalarValue::from(0 as f64),
+            mean: ScalarValue::from(0 as f64),
+            count: 0,
+        })
+    }
+
+    pub fn get_count(&self) -> u64 {
+        self.count
+    }
+
+    pub fn get_mean(&self) -> ScalarValue {
+        self.mean.clone()
+    }
+
+    pub fn get_m2(&self) -> ScalarValue {
+        self.m2.clone()
+    }
+}
+
+impl Accumulator for VarianceAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            self.mean.clone(),
+            self.m2.clone(),
+        ])
+    }
+
+    fn update(&mut self, values: &[ScalarValue]) -> Result<()> {
+        let values = &values[0];
+        let is_empty = values.is_null();
+
+        if !is_empty {
+            let new_count = self.count + 1;
+            let delta1 = ScalarValue::add(values, &self.mean.arithmetic_negate())?;

Review comment:
       Completely agree. I have plan to investigate this as followup PR. The current challenge is the algorithm will loose parallelizability if using a batch friendly algorithm. And I need to spend more time to understand the code. One question I have is will there be a chance `update` and `batch_update` can be used in the same job, i.e. if one job can call `update` on some data and `batch_update` on some other data. Reason for that is the online version of the algorithm requires an intermediate value to be calculated so it is not compatible with batch mode, that is, we can only do all batch or all online. 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       Completely agree. My feeling is these basic operations could be implemented in ScalarValue since many operators will depend on them. I want to pick your brain (and other contributor's too) if you think this is a good idea - this deserves a seperate discussion. Also with the way ScalarValue is implemented currently each operator needs a high match block, maybe using generics can make it a little easier? 
   
   Here are some options I propose:
   1. Remove the redundency, I can either move sum to Scalavalue or the other direction is fine. Personally I prefer to have it in ScalarValue.
   2. Leave the code as is until we have a discussion/conclusion around how to handle arithmetic operation for ScalarValue. 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       Good point, will explore the option.

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       Completely agree. My feeling is these basic operations could be implemented in ScalarValue since many operators will depend on them. I want to pick your brain (and other contributor's too) if you think this is a good idea - this deserves a seperate discussion. Also with the way ScalarValue is implemented currently each operator needs a huge match block, maybe using generics can make it a little easier? 
   
   Here are some options I propose:
   1. Remove the redundency, I can either move sum to Scalavalue or the other direction is fine. Personally I prefer to have it in ScalarValue.
   2. Leave the code as is until we have a discussion/conclusion around how to handle arithmetic operation for ScalarValue. 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The different value from postgres and google is because of the nature of stddev, there are two versions: 1. population and 2. sample. The one in this PR is population, looks like the default one with postgres and google is sample. The difference in the calculation is very minimal I can include the sampe version in the PR as well. Good catch! 
   
   So my proposal is to add two functions: stddev_pop and stddev_s, does this look reasonable? @alamb 
   
   And I will look into the inconsistency between query runs. Thanks! 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The different value from postgres and google is because of the nature of stddev, there are two versions: 1. population and 2. sample. The one in this PR is population, looks like the default one with postgres and google is sample. The difference in the calculation is very minimal I can include the sampe version in the PR as well. Good catch! 
   
   So my proposal is to add two functions: stddev_pop and stddev_samp (following Postgres standard), and have stddev default to stddev_samp. Does this look reasonable? @alamb 
   
   And I will look into the inconsistency between query runs. Thanks! 

##########
File path: datafusion/src/physical_plan/expressions/mod.rs
##########
@@ -84,9 +86,13 @@ pub use nth_value::NthValue;
 pub use nullif::{nullif_func, SUPPORTED_NULLIF_TYPES};
 pub use rank::{dense_rank, percent_rank, rank};
 pub use row_number::RowNumber;
+pub(crate) use stddev::is_stddev_support_arg_type;
+pub use stddev::{stddev_return_type, Stddev};

Review comment:
       👍 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without

Review comment:
       First of all, thanks for suggesting this approach, I was able to get it working after making `evaluate_to_scalar` to `pub(crate)`, it will reduce the amount of code significantly. 
   
   Though after some thinking I am leaning towards having a separate discussion and make the change in a different PR. Here are some of my thoughts:
   1. It requires changing the scope of an internal function
   2. As you mentioned, there are quite a bit of performance overhead
   3. I think ScalarValue arithmetic is important to many future operators, we can take the chance to provide official support. It feels right to formally discuss the topic and review if this is the best way to achieve it. 
   4. This method doesn't support uptype when overflow happens. This is useful because for aggregation functions it is quite common the results requires more accuracy than the original type. 
   5. Some other functions such as `sum` and `avg` also have similar implementations. Though this is not a good reason to keep as is but it shows the value of official Scalar arithmetic support.
   
   Please let me know your thoughts. If makes sense I am happy to create a new issue for the work and join the discussion. Thanks! 

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without
+        // writing a hige match block.
+        // TODO: Add support for decimal types
+        match (lhs, rhs) {
+            (ScalarValue::Decimal128(_, _, _), _) |
+            (_, ScalarValue::Decimal128(_, _, _)) => {
+                Err(DataFusionError::Internal(
+                    "Addition with Decimals are not supported for now".to_string()
+                ))
+            },
+            // f64 / _
+            (ScalarValue::Float64(f1), ScalarValue::Float64(f2)) => {
+                Ok(ScalarValue::Float64(Some(f1.unwrap() + f2.unwrap())))

Review comment:
       Added checkes for `None` values.

##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {

Review comment:
       The inconsistency between query runs are now fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno closed pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno closed pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow-datafusion] realno commented on a change in pull request #1525: Add stddev operator

Posted by GitBox <gi...@apache.org>.
realno commented on a change in pull request #1525:
URL: https://github.com/apache/arrow-datafusion/pull/1525#discussion_r780630182



##########
File path: datafusion/src/scalar.rs
##########
@@ -526,6 +526,282 @@ macro_rules! eq_array_primitive {
 }
 
 impl ScalarValue {
+    /// Return true if the value is numeric
+    pub fn is_numeric(&self) -> bool {
+        matches!(self,
+            ScalarValue::Float32(_)
+            | ScalarValue::Float64(_)
+            | ScalarValue::Decimal128(_, _, _)
+            | ScalarValue::Int8(_)
+            | ScalarValue::Int16(_)
+            | ScalarValue::Int32(_)
+            | ScalarValue::Int64(_)
+            | ScalarValue::UInt8(_)
+            | ScalarValue::UInt16(_)
+            | ScalarValue::UInt32(_)
+            | ScalarValue::UInt64(_)
+        )
+    }
+
+    /// Add two numeric ScalarValues
+    pub fn add(lhs: &ScalarValue, rhs: &ScalarValue) -> Result<ScalarValue> {
+        if !lhs.is_numeric() || !rhs.is_numeric() {
+            return Err(DataFusionError::Internal(format!(
+                "Addition only supports numeric types, \
+                    here has  {:?} and {:?}",
+                lhs.get_datatype(),
+                rhs.get_datatype()
+            )));
+        }
+
+        // TODO: Finding a good way to support operation between different types without
+        // writing a hige match block.
+        // TODO: Add support for decimal types
+        match (lhs, rhs) {
+            (ScalarValue::Decimal128(_, _, _), _) |
+            (_, ScalarValue::Decimal128(_, _, _)) => {
+                Err(DataFusionError::Internal(
+                    "Addition with Decimals are not supported for now".to_string()
+                ))
+            },
+            // f64 / _
+            (ScalarValue::Float64(f1), ScalarValue::Float64(f2)) => {
+                Ok(ScalarValue::Float64(Some(f1.unwrap() + f2.unwrap())))

Review comment:
       Added checkes for `None` values.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org