You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/30 13:57:11 UTC

[arrow-datafusion] branch master updated: refactor distinct_expressions.rs (#2386)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a20efe40 refactor distinct_expressions.rs (#2386)
7a20efe40 is described below

commit 7a20efe40b21182d33f5f3cf20ebb06cb3d11c24
Author: DuRipeng <45...@qq.com>
AuthorDate: Sat Apr 30 21:57:04 2022 +0800

    refactor distinct_expressions.rs (#2386)
---
 .../src/aggregate/array_agg_distinct.rs            | 308 ++++++++++++++++
 .../{distinct_expressions.rs => count_distinct.rs} | 388 +++------------------
 datafusion/physical-expr/src/aggregate/mod.rs      |  12 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   3 +-
 4 files changed, 375 insertions(+), 336 deletions(-)

diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
new file mode 100644
index 000000000..dfe19e0eb
--- /dev/null
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Implementations for DISTINCT expressions, e.g. `COUNT(DISTINCT c)`
+
+use super::*;
+use arrow::datatypes::{DataType, Field};
+use std::any::Any;
+use std::fmt::Debug;
+use std::sync::Arc;
+
+use arrow::array::{Array, ArrayRef};
+use std::collections::HashSet;
+
+use crate::{AggregateExpr, PhysicalExpr};
+use datafusion_common::Result;
+use datafusion_common::ScalarValue;
+use datafusion_expr::Accumulator;
+
+/// Expression for a ARRAY_AGG(DISTINCT) aggregation.
+#[derive(Debug)]
+pub struct DistinctArrayAgg {
+    /// Column name
+    name: String,
+    /// The DataType for the input expression
+    input_data_type: DataType,
+    /// The input expression
+    expr: Arc<dyn PhysicalExpr>,
+}
+
+impl DistinctArrayAgg {
+    /// Create a new DistinctArrayAgg aggregate function
+    pub fn new(
+        expr: Arc<dyn PhysicalExpr>,
+        name: impl Into<String>,
+        input_data_type: DataType,
+    ) -> Self {
+        let name = name.into();
+        Self {
+            name,
+            expr,
+            input_data_type,
+        }
+    }
+}
+
+impl AggregateExpr for DistinctArrayAgg {
+    /// Return a reference to Any that can be used for downcasting
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+
+    fn field(&self) -> Result<Field> {
+        Ok(Field::new(
+            &self.name,
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        ))
+    }
+
+    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
+        Ok(Box::new(DistinctArrayAggAccumulator::try_new(
+            &self.input_data_type,
+        )?))
+    }
+
+    fn state_fields(&self) -> Result<Vec<Field>> {
+        Ok(vec![Field::new(
+            &format_state_name(&self.name, "distinct_array_agg"),
+            DataType::List(Box::new(Field::new(
+                "item",
+                self.input_data_type.clone(),
+                true,
+            ))),
+            false,
+        )])
+    }
+
+    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
+        vec![self.expr.clone()]
+    }
+
+    fn name(&self) -> &str {
+        &self.name
+    }
+}
+
+#[derive(Debug)]
+struct DistinctArrayAggAccumulator {
+    values: HashSet<ScalarValue>,
+    datatype: DataType,
+}
+
+impl DistinctArrayAggAccumulator {
+    pub fn try_new(datatype: &DataType) -> Result<Self> {
+        Ok(Self {
+            values: HashSet::new(),
+            datatype: datatype.clone(),
+        })
+    }
+}
+
+impl Accumulator for DistinctArrayAggAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![ScalarValue::List(
+            Some(Box::new(self.values.clone().into_iter().collect())),
+            Box::new(self.datatype.clone()),
+        )])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        assert_eq!(values.len(), 1, "batch input should only include 1 column!");
+
+        let arr = &values[0];
+        for i in 0..arr.len() {
+            self.values.insert(ScalarValue::try_from_array(arr, i)?);
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        if states.is_empty() {
+            return Ok(());
+        };
+
+        for array in states {
+            for j in 0..array.len() {
+                self.values.insert(ScalarValue::try_from_array(array, j)?);
+            }
+        }
+
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::List(
+            Some(Box::new(self.values.clone().into_iter().collect())),
+            Box::new(self.datatype.clone()),
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::expressions::col;
+    use crate::expressions::tests::aggregate;
+    use arrow::array::{ArrayRef, Int32Array};
+    use arrow::datatypes::{DataType, Schema};
+    use arrow::record_batch::RecordBatch;
+
+    fn check_distinct_array_agg(
+        input: ArrayRef,
+        expected: ScalarValue,
+        datatype: DataType,
+    ) -> Result<()> {
+        let schema = Schema::new(vec![Field::new("a", datatype.clone(), false)]);
+        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![input])?;
+
+        let agg = Arc::new(DistinctArrayAgg::new(
+            col("a", &schema)?,
+            "bla".to_string(),
+            datatype,
+        ));
+        let actual = aggregate(&batch, agg)?;
+
+        match (expected, actual) {
+            (ScalarValue::List(Some(mut e), _), ScalarValue::List(Some(mut a), _)) => {
+                // workaround lack of Ord of ScalarValue
+                let cmp = |a: &ScalarValue, b: &ScalarValue| {
+                    a.partial_cmp(b).expect("Can compare ScalarValues")
+                };
+
+                e.sort_by(cmp);
+                a.sort_by(cmp);
+                // Check that the inputs are the same
+                assert_eq!(e, a);
+            }
+            _ => {
+                unreachable!()
+            }
+        }
+
+        Ok(())
+    }
+
+    #[test]
+    fn distinct_array_agg_i32() -> Result<()> {
+        let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 4, 5, 2]));
+
+        let out = ScalarValue::List(
+            Some(Box::new(vec![
+                ScalarValue::Int32(Some(1)),
+                ScalarValue::Int32(Some(2)),
+                ScalarValue::Int32(Some(7)),
+                ScalarValue::Int32(Some(4)),
+                ScalarValue::Int32(Some(5)),
+            ])),
+            Box::new(DataType::Int32),
+        );
+
+        check_distinct_array_agg(col, out, DataType::Int32)
+    }
+
+    #[test]
+    fn distinct_array_agg_nested() -> Result<()> {
+        // [[1, 2, 3], [4, 5]]
+        let l1 = ScalarValue::List(
+            Some(Box::new(vec![
+                ScalarValue::List(
+                    Some(Box::new(vec![
+                        ScalarValue::from(1i32),
+                        ScalarValue::from(2i32),
+                        ScalarValue::from(3i32),
+                    ])),
+                    Box::new(DataType::Int32),
+                ),
+                ScalarValue::List(
+                    Some(Box::new(vec![
+                        ScalarValue::from(4i32),
+                        ScalarValue::from(5i32),
+                    ])),
+                    Box::new(DataType::Int32),
+                ),
+            ])),
+            Box::new(DataType::List(Box::new(Field::new(
+                "item",
+                DataType::Int32,
+                true,
+            )))),
+        );
+
+        // [[6], [7, 8]]
+        let l2 = ScalarValue::List(
+            Some(Box::new(vec![
+                ScalarValue::List(
+                    Some(Box::new(vec![ScalarValue::from(6i32)])),
+                    Box::new(DataType::Int32),
+                ),
+                ScalarValue::List(
+                    Some(Box::new(vec![
+                        ScalarValue::from(7i32),
+                        ScalarValue::from(8i32),
+                    ])),
+                    Box::new(DataType::Int32),
+                ),
+            ])),
+            Box::new(DataType::List(Box::new(Field::new(
+                "item",
+                DataType::Int32,
+                true,
+            )))),
+        );
+
+        // [[9]]
+        let l3 = ScalarValue::List(
+            Some(Box::new(vec![ScalarValue::List(
+                Some(Box::new(vec![ScalarValue::from(9i32)])),
+                Box::new(DataType::Int32),
+            )])),
+            Box::new(DataType::List(Box::new(Field::new(
+                "item",
+                DataType::Int32,
+                true,
+            )))),
+        );
+
+        let list = ScalarValue::List(
+            Some(Box::new(vec![l1.clone(), l2.clone(), l3.clone()])),
+            Box::new(DataType::List(Box::new(Field::new(
+                "item",
+                DataType::Int32,
+                true,
+            )))),
+        );
+
+        // Duplicate l1 in the input array and check that it is deduped in the output.
+        let array = ScalarValue::iter_to_array(vec![l1.clone(), l2, l3, l1]).unwrap();
+
+        check_distinct_array_agg(
+            array,
+            list,
+            DataType::List(Box::new(Field::new(
+                "item",
+                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
+                true,
+            ))),
+        )
+    }
+}
diff --git a/datafusion/physical-expr/src/aggregate/distinct_expressions.rs b/datafusion/physical-expr/src/aggregate/count_distinct.rs
similarity index 74%
rename from datafusion/physical-expr/src/aggregate/distinct_expressions.rs
rename to datafusion/physical-expr/src/aggregate/count_distinct.rs
index e4f1e01e3..cb32dcd49 100644
--- a/datafusion/physical-expr/src/aggregate/distinct_expressions.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -15,12 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Implementations for DISTINCT expressions, e.g. `COUNT(DISTINCT c)`
-
+use super::*;
 use arrow::datatypes::{DataType, Field};
 use std::any::Any;
 use std::fmt::Debug;
-use std::hash::Hash;
 use std::sync::Arc;
 
 use ahash::RandomState;
@@ -32,13 +30,6 @@ use datafusion_common::ScalarValue;
 use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
-#[derive(Debug, PartialEq, Eq, Hash, Clone)]
-struct DistinctScalarValues(Vec<ScalarValue>);
-
-fn format_state_name(name: &str, state_name: &str) -> String {
-    format!("{}[{}]", name, state_name)
-}
-
 /// Expression for a COUNT(DISTINCT) aggregation.
 #[derive(Debug)]
 pub struct DistinctCount {
@@ -233,146 +224,44 @@ impl Accumulator for DistinctCountAccumulator {
     }
 }
 
-/// Expression for a ARRAY_AGG(DISTINCT) aggregation.
-#[derive(Debug)]
-pub struct DistinctArrayAgg {
-    /// Column name
-    name: String,
-    /// The DataType for the input expression
-    input_data_type: DataType,
-    /// The input expression
-    expr: Arc<dyn PhysicalExpr>,
-}
-
-impl DistinctArrayAgg {
-    /// Create a new DistinctArrayAgg aggregate function
-    pub fn new(
-        expr: Arc<dyn PhysicalExpr>,
-        name: impl Into<String>,
-        input_data_type: DataType,
-    ) -> Self {
-        let name = name.into();
-        Self {
-            name,
-            expr,
-            input_data_type,
-        }
-    }
-}
-
-impl AggregateExpr for DistinctArrayAgg {
-    /// Return a reference to Any that can be used for downcasting
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn field(&self) -> Result<Field> {
-        Ok(Field::new(
-            &self.name,
-            DataType::List(Box::new(Field::new(
-                "item",
-                self.input_data_type.clone(),
-                true,
-            ))),
-            false,
-        ))
-    }
-
-    fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctArrayAggAccumulator::try_new(
-            &self.input_data_type,
-        )?))
-    }
-
-    fn state_fields(&self) -> Result<Vec<Field>> {
-        Ok(vec![Field::new(
-            &format_state_name(&self.name, "distinct_array_agg"),
-            DataType::List(Box::new(Field::new(
-                "item",
-                self.input_data_type.clone(),
-                true,
-            ))),
-            false,
-        )])
-    }
-
-    fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>> {
-        vec![self.expr.clone()]
-    }
-
-    fn name(&self) -> &str {
-        &self.name
-    }
-}
-
-#[derive(Debug)]
-struct DistinctArrayAggAccumulator {
-    values: HashSet<ScalarValue>,
-    datatype: DataType,
-}
-
-impl DistinctArrayAggAccumulator {
-    pub fn try_new(datatype: &DataType) -> Result<Self> {
-        Ok(Self {
-            values: HashSet::new(),
-            datatype: datatype.clone(),
-        })
-    }
-}
-
-impl Accumulator for DistinctArrayAggAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::List(
-            Some(Box::new(self.values.clone().into_iter().collect())),
-            Box::new(self.datatype.clone()),
-        )])
-    }
-
-    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        assert_eq!(values.len(), 1, "batch input should only include 1 column!");
-
-        let arr = &values[0];
-        for i in 0..arr.len() {
-            self.values.insert(ScalarValue::try_from_array(arr, i)?);
-        }
-        Ok(())
-    }
-
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
-        };
-
-        for array in states {
-            for j in 0..array.len() {
-                self.values.insert(ScalarValue::try_from_array(array, j)?);
-            }
-        }
-
-        Ok(())
-    }
-
-    fn evaluate(&self) -> Result<ScalarValue> {
-        Ok(ScalarValue::List(
-            Some(Box::new(self.values.clone().into_iter().collect())),
-            Box::new(self.datatype.clone()),
-        ))
-    }
-}
-
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
     use arrow::array::{
         ArrayRef, BooleanArray, Float32Array, Float64Array, Int16Array, Int32Array,
         Int64Array, Int8Array, ListArray, UInt16Array, UInt32Array, UInt64Array,
         UInt8Array,
     };
     use arrow::array::{Int32Builder, ListBuilder, UInt64Builder};
-    use arrow::datatypes::{DataType, Schema};
-    use arrow::record_batch::RecordBatch;
+    use arrow::datatypes::DataType;
+
+    macro_rules! state_to_vec {
+        ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
+            match $LIST {
+                ScalarValue::List(_, data_type) => match data_type.as_ref() {
+                    &DataType::$DATA_TYPE => (),
+                    _ => panic!("Unexpected DataType for list"),
+                },
+                _ => panic!("Expected a ScalarValue::List"),
+            }
+
+            match $LIST {
+                ScalarValue::List(None, _) => None,
+                ScalarValue::List(Some(scalar_values), _) => {
+                    let vec = scalar_values
+                        .iter()
+                        .map(|scalar_value| match scalar_value {
+                            ScalarValue::$DATA_TYPE(value) => *value,
+                            _ => panic!("Unexpected ScalarValue variant"),
+                        })
+                        .collect::<Vec<Option<$PRIM_TY>>>();
+
+                    Some(vec)
+                }
+                _ => unreachable!(),
+            }
+        }};
+    }
 
     macro_rules! build_list {
         ($LISTS:expr, $BUILDER_TYPE:ident) => {{
@@ -401,31 +290,33 @@ mod tests {
         }};
     }
 
-    macro_rules! state_to_vec {
-        ($LIST:expr, $DATA_TYPE:ident, $PRIM_TY:ty) => {{
-            match $LIST {
-                ScalarValue::List(_, data_type) => match data_type.as_ref() {
-                    &DataType::$DATA_TYPE => (),
-                    _ => panic!("Unexpected DataType for list"),
-                },
-                _ => panic!("Expected a ScalarValue::List"),
-            }
+    macro_rules! test_count_distinct_update_batch_numeric {
+        ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{
+            let values: Vec<Option<$PRIM_TYPE>> = vec![
+                Some(1),
+                Some(1),
+                None,
+                Some(3),
+                Some(2),
+                None,
+                Some(2),
+                Some(3),
+                Some(1),
+            ];
 
-            match $LIST {
-                ScalarValue::List(None, _) => None,
-                ScalarValue::List(Some(scalar_values), _) => {
-                    let vec = scalar_values
-                        .iter()
-                        .map(|scalar_value| match scalar_value {
-                            ScalarValue::$DATA_TYPE(value) => *value,
-                            _ => panic!("Unexpected ScalarValue variant"),
-                        })
-                        .collect::<Vec<Option<$PRIM_TY>>>();
+            let arrays = vec![Arc::new($ARRAY_TYPE::from(values)) as ArrayRef];
 
-                    Some(vec)
-                }
-                _ => unreachable!(),
-            }
+            let (states, result) = run_update_batch(&arrays)?;
+
+            let mut state_vec =
+                state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
+            state_vec.sort();
+
+            assert_eq!(states.len(), 1);
+            assert_eq!(state_vec, vec![Some(1), Some(2), Some(3)]);
+            assert_eq!(result, ScalarValue::UInt64(Some(3)));
+
+            Ok(())
         }};
     }
 
@@ -508,36 +399,6 @@ mod tests {
         Ok((accum.state()?, accum.evaluate()?))
     }
 
-    macro_rules! test_count_distinct_update_batch_numeric {
-        ($ARRAY_TYPE:ident, $DATA_TYPE:ident, $PRIM_TYPE:ty) => {{
-            let values: Vec<Option<$PRIM_TYPE>> = vec![
-                Some(1),
-                Some(1),
-                None,
-                Some(3),
-                Some(2),
-                None,
-                Some(2),
-                Some(3),
-                Some(1),
-            ];
-
-            let arrays = vec![Arc::new($ARRAY_TYPE::from(values)) as ArrayRef];
-
-            let (states, result) = run_update_batch(&arrays)?;
-
-            let mut state_vec =
-                state_to_vec!(&states[0], $DATA_TYPE, $PRIM_TYPE).unwrap();
-            state_vec.sort();
-
-            assert_eq!(states.len(), 1);
-            assert_eq!(state_vec, vec![Some(1), Some(2), Some(3)]);
-            assert_eq!(result, ScalarValue::UInt64(Some(3)));
-
-            Ok(())
-        }};
-    }
-
     // Used trait to create associated constant for f32 and f64
     trait SubNormal: 'static {
         const SUBNORMAL: Self;
@@ -870,143 +731,4 @@ mod tests {
 
         Ok(())
     }
-
-    fn check_distinct_array_agg(
-        input: ArrayRef,
-        expected: ScalarValue,
-        datatype: DataType,
-    ) -> Result<()> {
-        let schema = Schema::new(vec![Field::new("a", datatype.clone(), false)]);
-        let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![input])?;
-
-        let agg = Arc::new(DistinctArrayAgg::new(
-            col("a", &schema)?,
-            "bla".to_string(),
-            datatype,
-        ));
-        let actual = aggregate(&batch, agg)?;
-
-        match (expected, actual) {
-            (ScalarValue::List(Some(mut e), _), ScalarValue::List(Some(mut a), _)) => {
-                // workaround lack of Ord of ScalarValue
-                let cmp = |a: &ScalarValue, b: &ScalarValue| {
-                    a.partial_cmp(b).expect("Can compare ScalarValues")
-                };
-
-                e.sort_by(cmp);
-                a.sort_by(cmp);
-                // Check that the inputs are the same
-                assert_eq!(e, a);
-            }
-            _ => {
-                unreachable!()
-            }
-        }
-
-        Ok(())
-    }
-
-    #[test]
-    fn distinct_array_agg_i32() -> Result<()> {
-        let col: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 7, 4, 5, 2]));
-
-        let out = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::Int32(Some(1)),
-                ScalarValue::Int32(Some(2)),
-                ScalarValue::Int32(Some(7)),
-                ScalarValue::Int32(Some(4)),
-                ScalarValue::Int32(Some(5)),
-            ])),
-            Box::new(DataType::Int32),
-        );
-
-        check_distinct_array_agg(col, out, DataType::Int32)
-    }
-
-    #[test]
-    fn distinct_array_agg_nested() -> Result<()> {
-        // [[1, 2, 3], [4, 5]]
-        let l1 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(1i32),
-                        ScalarValue::from(2i32),
-                        ScalarValue::from(3i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(4i32),
-                        ScalarValue::from(5i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-            ])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        // [[6], [7, 8]]
-        let l2 = ScalarValue::List(
-            Some(Box::new(vec![
-                ScalarValue::List(
-                    Some(Box::new(vec![ScalarValue::from(6i32)])),
-                    Box::new(DataType::Int32),
-                ),
-                ScalarValue::List(
-                    Some(Box::new(vec![
-                        ScalarValue::from(7i32),
-                        ScalarValue::from(8i32),
-                    ])),
-                    Box::new(DataType::Int32),
-                ),
-            ])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        // [[9]]
-        let l3 = ScalarValue::List(
-            Some(Box::new(vec![ScalarValue::List(
-                Some(Box::new(vec![ScalarValue::from(9i32)])),
-                Box::new(DataType::Int32),
-            )])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        let list = ScalarValue::List(
-            Some(Box::new(vec![l1.clone(), l2.clone(), l3.clone()])),
-            Box::new(DataType::List(Box::new(Field::new(
-                "item",
-                DataType::Int32,
-                true,
-            )))),
-        );
-
-        // Duplicate l1 in the input array and check that it is deduped in the output.
-        let array = ScalarValue::iter_to_array(vec![l1.clone(), l2, l3, l1]).unwrap();
-
-        check_distinct_array_agg(
-            array,
-            list,
-            DataType::List(Box::new(Field::new(
-                "item",
-                DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
-                true,
-            ))),
-        )
-    }
 }
diff --git a/datafusion/physical-expr/src/aggregate/mod.rs b/datafusion/physical-expr/src/aggregate/mod.rs
index 019a60cd5..a9f3167c4 100644
--- a/datafusion/physical-expr/src/aggregate/mod.rs
+++ b/datafusion/physical-expr/src/aggregate/mod.rs
@@ -17,7 +17,7 @@
 
 use crate::PhysicalExpr;
 use arrow::datatypes::Field;
-use datafusion_common::Result;
+use datafusion_common::{Result, ScalarValue};
 use datafusion_expr::Accumulator;
 use std::any::Any;
 use std::fmt::Debug;
@@ -28,12 +28,13 @@ pub(crate) mod approx_median;
 pub(crate) mod approx_percentile_cont;
 pub(crate) mod approx_percentile_cont_with_weight;
 pub(crate) mod array_agg;
+pub(crate) mod array_agg_distinct;
 pub(crate) mod average;
 pub(crate) mod coercion_rule;
 pub(crate) mod correlation;
 pub(crate) mod count;
+pub(crate) mod count_distinct;
 pub(crate) mod covariance;
-pub(crate) mod distinct_expressions;
 #[macro_use]
 pub(crate) mod min_max;
 pub mod build_in;
@@ -76,3 +77,10 @@ pub trait AggregateExpr: Send + Sync + Debug {
         "AggregateExpr: default name"
     }
 }
+
+#[derive(Debug, PartialEq, Eq, Hash, Clone)]
+struct DistinctScalarValues(Vec<ScalarValue>);
+
+fn format_state_name(name: &str, state_name: &str) -> String {
+    format!("{}[{}]", name, state_name)
+}
diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs
index 813e87c8f..3190f680f 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -42,12 +42,13 @@ pub use crate::aggregate::approx_median::ApproxMedian;
 pub use crate::aggregate::approx_percentile_cont::ApproxPercentileCont;
 pub use crate::aggregate::approx_percentile_cont_with_weight::ApproxPercentileContWithWeight;
 pub use crate::aggregate::array_agg::ArrayAgg;
+pub use crate::aggregate::array_agg_distinct::DistinctArrayAgg;
 pub use crate::aggregate::average::{Avg, AvgAccumulator};
 pub use crate::aggregate::build_in::create_aggregate_expr;
 pub use crate::aggregate::correlation::Correlation;
 pub use crate::aggregate::count::Count;
+pub use crate::aggregate::count_distinct::DistinctCount;
 pub use crate::aggregate::covariance::{Covariance, CovariancePop};
-pub use crate::aggregate::distinct_expressions::{DistinctArrayAgg, DistinctCount};
 pub use crate::aggregate::min_max::{Max, Min};
 pub use crate::aggregate::min_max::{MaxAccumulator, MinAccumulator};
 pub use crate::aggregate::stats::StatsType;