You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jaylmiller (via GitHub)" <gi...@apache.org> on 2023/03/11 22:05:21 UTC

[GitHub] [arrow-datafusion] jaylmiller opened a new pull request, #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

jaylmiller opened a new pull request, #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554

   # Which issue does this PR close?
   Closes #258.
   
   # Rationale for this change
   The count distinct physical expr was doing alot of unnecessary hashing when it is ran on dictionary types. Instead, we can just keep track of seen indices (keys) with an array and no hashing is required.
   
   # What changes are included in this PR?
   A new accumulator (`CountDistinctDictAccumulator`) that is returned by `DistinctCount` in the case that a dictionary array is being counted. If it is not a dictionary array, just fall back to the default accumulator (`DistinctCountAccumulator`)
   
   # Are these changes tested?
   Added some new unit tests.
   
   # Are there any user-facing changes?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1476304363

   @mingmwang Sorry for delay. I haven't had a chance to get back to this PR yet (currently working on #5292). 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "waynexia (via GitHub)" <gi...@apache.org>.
waynexia commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1468157538

   Sorry for the delay, I plan to review it tomorrow!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1466981090

   @jaylmiller thanks for the PR. Would be great to get some knowloedge how the much performance increased? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1465259556

   @waynexia thanks for correcting my assumption about how normalized dicts 😀. I've made changes correcting this. There is a bit of hashing required but significantly less than before since we only need to hash once for each value type (instead of hashing every cell). 
   
   Also since there was now alot of shared logic between the 2 accumulators, i've pulled that out into funcs so both accumulators can use it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "comphead (via GitHub)" <gi...@apache.org>.
comphead commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1134637197


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +219,98 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = (0..nvalues).map(|_| false).collect::<Vec<_>>();
+        for idx in arr.keys_iter().flatten() {
+            seen_map[idx] = true;
+        }
+        for (idx, seen) in seen_map.into_iter().enumerate() {
+            if seen {

Review Comment:
   do you think if its possible to merge 2 loops into 1



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +86,124 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        // just checked that datatype is a valid dict key type
+                        unreachable!()

Review Comment:
   we were trying as part of other tickets to get rid of panic!, unreachable, dangerous unwrap. Please return Err



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1468454047

   > I think some case in clickbench will be improved.
   
   Ok I'll look into getting some results on these cases
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135497797


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
 type DistinctScalarValues = ScalarValue;
-
+type ValueSet = HashSet<DistinctScalarValues, RandomState>;

Review Comment:
   I wonder what value these type aliases add. The extra indirection of `DistinctScalarValues` --> `ScalarValue` simply seems to make things more complicated 🤔 



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +86,124 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        // just checked that datatype is a valid dict key type
+                        unreachable!()
+                    }
+                }
+            }
+            _ => Box::new(DistinctCountAccumulator {
+                values: HashSet::default(),
+                state_data_type: self.state_data_type.clone(),
+            }),
+        })
     }
 
     fn name(&self) -> &str {
         &self.name
     }
 }
 
-#[derive(Debug)]
-struct DistinctCountAccumulator {
-    values: HashSet<DistinctScalarValues, RandomState>,
-    state_data_type: DataType,
+// calculating the size of values hashset for fixed length values,
+// taking first batch size * number of batches.
+// This method is faster than full_size(), however it is not suitable for variable length
+// values like strings or complex types
+fn values_fixed_size(values: &ValueSet) -> usize {
+    (std::mem::size_of::<DistinctScalarValues>() * values.capacity())
+        + values
+            .iter()
+            .next()
+            .map(|vals| ScalarValue::size(vals) - std::mem::size_of_val(vals))
+            .unwrap_or(0)
+}
+// calculates the size as accurate as possible, call to this method is expensive

Review Comment:
   ```suggestion
   // calculates the size as accurate as possible, call to this method is expensive
   // but necessary to correctly account for variable length strings
   ```



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +219,98 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = (0..nvalues).map(|_| false).collect::<Vec<_>>();

Review Comment:
   ```suggestion
           let mut seen_map = vec![(false; nvalues];
   ```



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +219,98 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {

Review Comment:
   As an alternate construction, since it is only `update_batch` that varies by the  dictionary key type, I suspect you could make this PR quite a bit smaller by dispatching at runtime to an appropriate type of update_batch. However, that would require a dispatch on each batch, where this PR only requires a single dispatch during planning (at the expense of larger code)



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -577,4 +697,76 @@ mod tests {
         assert_eq!(result, ScalarValue::Int64(Some(2)));
         Ok(())
     }
+
+    #[test]
+    fn count_distinct_dict_update() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        // value "b" is never used
+        let keys =
+            Int8Array::from_iter(vec![Some(0), Some(0), Some(0), Some(0), None, Some(2)]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        // should evaluate to 2 since "b" never seen
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(2)));
+        // now update with a new batch that does use "b" (and non-normalized values)
+        let values = StringArray::from_iter_values(["b", "a", "c", "d"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
+
+    #[test]
+    fn count_distinct_dict_merge() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        // create accum with 1 value seen
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(1)));
+        // create accum with state that has seen "a" and "b" but not "c"
+        let values = StringArray::from_iter_values(["c", "b", "a"]);

Review Comment:
   👍 good call to use a different dictionary



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +86,124 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        // just checked that datatype is a valid dict key type
+                        unreachable!()

Review Comment:
   Though to be clear, I do think various parts of the rust arrow implementation will panic if another type is used as a dictionary key. Being defensive and returning an internal error sounds like a good idea to me



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1142178135


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +220,96 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = vec![false; nvalues];

Review Comment:
   Thanks for suggestion. Just to clarify, you mean using arrow bitmap, correct? Something like
   
   ```rs
   let mut seen_map = arrow::array::BooleanBufferBuilder::new(nvalues);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve performance of COUNT (distinct x) for dictionary columns #258 [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb closed pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258
URL: https://github.com/apache/arrow-datafusion/pull/5554


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135762085


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
 type DistinctScalarValues = ScalarValue;
-
+type ValueSet = HashSet<DistinctScalarValues, RandomState>;

Review Comment:
   I think maybe we remove `DistinctScalarValues` alias but keep `ValueSet`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1471913523

   Thanks I'll try that out


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] Improve performance of COUNT (distinct x) for dictionary columns #258 [arrow-datafusion]

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-2043641109

   Since this has been open for more than a year, closing it down. Feel free to reopen if/when you keep working on it. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1469003917

   ClickBench count distinct query when using dictionary columns is getting killed (this is on main) 🤔
   
   ```
   ❯ CREATE EXTERNAL TABLE hits_base
   STORED AS PARQUET
   LOCATION 'hits.parquet';
   0 rows in set. Query took 0.041 seconds.
   ❯ CREATE TABLE hits as
   select
     arrow_cast("UserID", 'Dictionary(Int32, Utf8)') as "UserID"
   FROM hits_base;
   
   0 rows in set. Query took 13.887 seconds.
   ❯ SELECT COUNT(DISTINCT "UserID") from hits;
   Killed
   ```
   
   "UserID" table is pretty high cardinality though: is there a better clickbench query/column pair to bench with? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133280472


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -577,4 +746,106 @@ mod tests {
         assert_eq!(result, ScalarValue::Int64(Some(2)));
         Ok(())
     }
+
+    #[test]
+    fn count_distinct_dict_update() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        // value "b" is never used
+        let keys =
+            Int8Array::from_iter(vec![Some(0), Some(0), Some(0), Some(0), None, Some(2)]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        // should evaluate to 2 since "b" never seen
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(2)));
+        // now update with a new batch that does use "b"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(1), Some(1), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
+
+    #[test]
+    fn count_distinct_dict_merge() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        // create accum with 1 value seen
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(1)));
+        // create accum with state that has seen "a" and "b" but not "c"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(1), None]);

Review Comment:
   Yes this would break under my current assumption, will change some stuff and get this working again.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133280592


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -577,4 +746,106 @@ mod tests {
         assert_eq!(result, ScalarValue::Int64(Some(2)));
         Ok(())
     }
+
+    #[test]
+    fn count_distinct_dict_update() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        // value "b" is never used
+        let keys =
+            Int8Array::from_iter(vec![Some(0), Some(0), Some(0), Some(0), None, Some(2)]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        // should evaluate to 2 since "b" never seen
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(2)));
+        // now update with a new batch that does use "b"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(1), Some(1), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
+
+    #[test]
+    fn count_distinct_dict_merge() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        // create accum with 1 value seen
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(1)));
+        // create accum with state that has seen "a" and "b" but not "c"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(1), None]);

Review Comment:
   This is a good test case, thanks 💪



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "waynexia (via GitHub)" <gi...@apache.org>.
waynexia commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133399584


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    /// laziliy initialized state that holds a boolean for each index.
+    /// the bool at each index indicates whether the value for that index has been seen yet.
+    state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("state", &self.state)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new() -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            state: None,
+        }
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {

Review Comment:
   Sorry I didn't notice this is inside the trait impl block...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135760728


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -31,7 +32,7 @@ use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 
 type DistinctScalarValues = ScalarValue;
-
+type ValueSet = HashSet<DistinctScalarValues, RandomState>;

Review Comment:
   I was a bit confused about the purpose of `DistinctScalarValues` as well to be honest but I kind of figured it was there for a good reason so I left it in 😅
   
   In terms of the added `ValueSet` alias, I personally thought it made the code a bit more readable but that is kind of subjective of course.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135773543


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +219,98 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {

Review Comment:
   I sortof assumed that `update_batch` is called frequently enough that the extra code was worth but someone more experienced with datafusion would know better than me about this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jhorstmann commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jhorstmann (via GitHub)" <gi...@apache.org>.
jhorstmann commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1142109702


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +220,96 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = vec![false; nvalues];

Review Comment:
   `seen_map` could become a bitmap to save space.
   
   High-cardinality inputs relative to the batch size (like `UserID` in clickbench) probably don't benefit that much from this map. I don't know the standard batch size that datafusion uses for that query, but a much larger batch size could improve the performance in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133280240


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    /// laziliy initialized state that holds a boolean for each index.
+    /// the bool at each index indicates whether the value for that index has been seen yet.
+    state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("state", &self.state)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new() -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            state: None,
+        }
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        if let Some(state) = &self.state {
+            let bools = state
+                .iter()
+                .map(|b| ScalarValue::Boolean(Some(*b)))
+                .collect();
+            Ok(vec![ScalarValue::List(
+                Some(bools),
+                Box::new(Field::new("item", DataType::Boolean, false)),
+            )])
+        } else {
+            // empty state
+            Ok(vec![])
+        }
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        if let Some(state) = &self.state {
+            if state.len() != nvalues {
+                return Err(DataFusionError::Internal(
+                    "Accumulator update_batch got invalid value".to_string(),
+                ));
+            }
+        } else {
+            // init state
+            self.state = Some((0..nvalues).map(|_| false).collect());
+        }
+        for idx in arr.keys_iter().flatten() {
+            self.state.as_mut().unwrap()[idx] = true;
+        }

Review Comment:
   I was under the impression that this is the case with how the datafusion accumulators work. I'm fairly new to learning datafusion internals though so I might be wrong about that! 
   
   I'll look into this more that is a good point and a large assumption on my end 😅



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133280154


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    /// laziliy initialized state that holds a boolean for each index.
+    /// the bool at each index indicates whether the value for that index has been seen yet.
+    state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("state", &self.state)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new() -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            state: None,
+        }
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {

Review Comment:
   This doesn't compile since `state` is part of `Accumulator` trait.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1466575480

   I plan to review this PR tomorrow. Thank you @waynexia  for the review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] sundy-li commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "sundy-li (via GitHub)" <gi...@apache.org>.
sundy-li commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1469495849

   For numeric/string args in distinct, I think it's better to have special states rather than putting the enum into the HashSet.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1472087640

   > ClickBench count distinct query when using dictionary columns is getting killed (this is on main as well as the PR) 🤔
   
   I wonder if we can try a smaller subset 🤔 
   
   ```sql
   ❯ CREATE TABLE hits as select
     arrow_cast("UserID", 'Dictionary(Int32, Utf8)') as "UserID"
   FROM 'hits.parquet'
   limit 10000000;
   0 rows in set. Query took 0.776 seconds.
   ❯ select count(distinct "UserID") from hits;
   +-----------------------------+
   | COUNT(DISTINCT hits.UserID) |
   +-----------------------------+
   | 1530334                     |
   +-----------------------------+
   1 row in set. Query took 71.388 seconds.
   ```
   
   I will try this on my benchmark machine


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1471857130

   @jaylmiller 
   How do you do the benchmark test ? I do not get a chance to take a closer look at this PR. 
   There is one logical optimization rule `RewriteDistinctAggregate` in DataFusion which will rewrite the single distinct aggregate to normal group by aggregate and replace the distinct aggregator to normal count aggregator.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1142178135


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +220,96 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = vec![false; nvalues];

Review Comment:
   Thanks for suggestion. Just to clarify, you mean using arrow bitmap, correct? Something like
   
   ```rs
   let mut bitmap = arrow::array::BooleanBufferBuilder::new(nvalues);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1487549926

   Marking as draft to signify this PR has feedback and is not waiting for another review at the moment. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "waynexia (via GitHub)" <gi...@apache.org>.
waynexia commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1133226701


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    /// laziliy initialized state that holds a boolean for each index.
+    /// the bool at each index indicates whether the value for that index has been seen yet.
+    state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("state", &self.state)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new() -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            state: None,
+        }
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {

Review Comment:
   ```suggestion
       #[cfg(test)]
       fn state(&self) -> Result<Vec<ScalarValue>> {
   ```
   
   It seems that this is only utilized in tests. Would it be better to addd the cfg annotation?



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -192,17 +230,150 @@ impl Accumulator for DistinctCountAccumulator {
         }
     }
 }
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    /// laziliy initialized state that holds a boolean for each index.
+    /// the bool at each index indicates whether the value for that index has been seen yet.
+    state: Option<Vec<bool>>,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("state", &self.state)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new() -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            state: None,
+        }
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        if let Some(state) = &self.state {
+            let bools = state
+                .iter()
+                .map(|b| ScalarValue::Boolean(Some(*b)))
+                .collect();
+            Ok(vec![ScalarValue::List(
+                Some(bools),
+                Box::new(Field::new("item", DataType::Boolean, false)),
+            )])
+        } else {
+            // empty state
+            Ok(vec![])
+        }
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        if let Some(state) = &self.state {
+            if state.len() != nvalues {
+                return Err(DataFusionError::Internal(
+                    "Accumulator update_batch got invalid value".to_string(),
+                ));
+            }
+        } else {
+            // init state
+            self.state = Some((0..nvalues).map(|_| false).collect());
+        }
+        for idx in arr.keys_iter().flatten() {
+            self.state.as_mut().unwrap()[idx] = true;
+        }

Review Comment:
   Are we guaranteed that the value set is consistent among every dictionary array? If not I'm afraid we still need some hash in this accumulator.



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -577,4 +746,106 @@ mod tests {
         assert_eq!(result, ScalarValue::Int64(Some(2)));
         Ok(())
     }
+
+    #[test]
+    fn count_distinct_dict_update() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        // value "b" is never used
+        let keys =
+            Int8Array::from_iter(vec![Some(0), Some(0), Some(0), Some(0), None, Some(2)]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        // should evaluate to 2 since "b" never seen
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(2)));
+        // now update with a new batch that does use "b"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(1), Some(1), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(3)));
+        Ok(())
+    }
+
+    #[test]
+    fn count_distinct_dict_merge() -> Result<()> {
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(0), None]);
+        let arrays =
+            vec![
+                Arc::new(DictionaryArray::<Int8Type>::try_new(&keys, &values).unwrap())
+                    as ArrayRef,
+            ];
+        let agg = DistinctCount::new(
+            arrays[0].data_type().clone(),
+            Arc::new(NoOp::new()),
+            String::from("__col_name__"),
+        );
+        // create accum with 1 value seen
+        let mut accum = agg.create_accumulator()?;
+        accum.update_batch(&arrays)?;
+        assert_eq!(accum.evaluate()?, ScalarValue::Int64(Some(1)));
+        // create accum with state that has seen "a" and "b" but not "c"
+        let values = StringArray::from_iter_values(["a", "b", "c"]);
+        let keys = Int8Array::from_iter(vec![Some(0), Some(1), None]);

Review Comment:
   what about changing the second dictionary to (reorder the value set)
   - value: ["c", "b", "a"]
   - keys: [2, 1, None]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1472134068

   Any luck? 
   I think the rule `SingleDistinctToGroupBy` has conflicts with the optimization and rewriting `Distinct` to `Group By` is not always beneficial.  Maybe we should add a configuration to turn on/off this rewriting.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1468427935

   A great job. I also notice this performance(I notice it by ClickBeach). Thanks for your job @jaylmiller .
   I prepare to review it tomorrow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135769488


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +219,98 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {

Review Comment:
   Yes I was thinking this myself and decided to err on the side of less dispatch. I'm not super opinionated either way though so I don't mind changing it if you think it's better to go for code simplicity.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jackwener commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jackwener (via GitHub)" <gi...@apache.org>.
jackwener commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1469494475

   cc @sundy-li 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] alamb commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "alamb (via GitHub)" <gi...@apache.org>.
alamb commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1135516150


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +86,124 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        // just checked that datatype is a valid dict key type
+                        unreachable!()

Review Comment:
   It would be preferable to fix this prior to merge, but I think it would also be fine to merge as is



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1468367900

   > Also, I wonder if you have had a chance to do any sort of benchmarking to show the improvement?
   
   Currently looking into this. Will post findings today


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] mingmwang commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "mingmwang (via GitHub)" <gi...@apache.org>.
mingmwang commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1471902012

   > I am essentially just running clickbench queries against my PR and against main. The only change is I am setting RegionID column to a dict:
   > 
   > ```
   > CREATE EXTERNAL TABLE hits_base
   > STORED AS PARQUET
   > LOCATION 'hits.parquet';
   > CREATE TABLE hits as
   > select 
   >   arrow_cast("RegionID", 'Dictionary(Int32, Utf8)') as "RegionID"
   > ....
   > ```
   > 
   > Do you have any recommendations, @mingmwang ?
   
   I had never check clickbench queries. Maybe you can comment the rule `SingleDistinctToGroupBy` and run the benchmark again and see whether there are improvements.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1471896179

   I am essentially just running clickbench queries against my PR and against main. The only change is I am setting RegionID column to a dict:
   ```
   CREATE EXTERNAL TABLE hits_base
   STORED AS PARQUET
   LOCATION 'hits.parquet';
   CREATE TABLE hits as
   select 
     arrow_cast("RegionID", 'Dictionary(Int32, Utf8)') as "RegionID"
   ....
   ```
   
   Do you have any recommendations, @mingmwang ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] waynexia commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "waynexia (via GitHub)" <gi...@apache.org>.
waynexia commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1137258276


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +85,126 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        return Err(DataFusionError::Internal(
+                            "Dict key has invalid datatype".to_string(),
+                        ))
+                    }
+                }
+            }
+            _ => Box::new(DistinctCountAccumulator {
+                values: HashSet::default(),
+                state_data_type: self.state_data_type.clone(),
+            }),
+        })
     }
 
     fn name(&self) -> &str {
         &self.name
     }
 }
 
-#[derive(Debug)]
-struct DistinctCountAccumulator {
-    values: HashSet<DistinctScalarValues, RandomState>,
-    state_data_type: DataType,
+// calculating the size of values hashset for fixed length values,
+// taking first batch size * number of batches.
+// This method is faster than full_size(), however it is not suitable for variable length
+// values like strings or complex types

Review Comment:
   ```suggestion
   /// calculating the size of values hashset for fixed length values,
   /// taking first batch size * number of batches.
   /// This method is faster than full_size(), however it is not suitable for variable length
   /// values like strings or complex types
   ```
   
   style: prefer to use document comments



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +85,126 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        return Err(DataFusionError::Internal(
+                            "Dict key has invalid datatype".to_string(),

Review Comment:
   nit: I would prefer to add the concrete type in the error message



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +220,96 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>

Review Comment:
   ```suggestion
   }
   
   impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
   ```
   
   style: add empty line between two blocks



##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -85,64 +85,126 @@ impl AggregateExpr for DistinctCount {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctCountAccumulator {
-            values: HashSet::default(),
-            state_data_type: self.state_data_type.clone(),
-        }))
+        use arrow::datatypes;
+        use datatypes::DataType::*;
+
+        Ok(match &self.state_data_type {
+            Dictionary(key, val) if key.is_dictionary_key_type() => {
+                let val_type = *val.clone();
+                match **key {
+                    Int8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int16 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int16Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int32 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int32Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    Int64 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::Int64Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt8 => Box::new(
+                        CountDistinctDictAccumulator::<datatypes::UInt8Type>::new(
+                            val_type,
+                        ),
+                    ),
+                    UInt16 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt16Type,
+                    >::new(val_type)),
+                    UInt32 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt32Type,
+                    >::new(val_type)),
+                    UInt64 => Box::new(CountDistinctDictAccumulator::<
+                        datatypes::UInt64Type,
+                    >::new(val_type)),
+                    _ => {
+                        return Err(DataFusionError::Internal(
+                            "Dict key has invalid datatype".to_string(),
+                        ))
+                    }
+                }
+            }
+            _ => Box::new(DistinctCountAccumulator {
+                values: HashSet::default(),
+                state_data_type: self.state_data_type.clone(),
+            }),
+        })
     }
 
     fn name(&self) -> &str {
         &self.name
     }
 }
 
-#[derive(Debug)]
-struct DistinctCountAccumulator {
-    values: HashSet<DistinctScalarValues, RandomState>,
-    state_data_type: DataType,
+// calculating the size of values hashset for fixed length values,
+// taking first batch size * number of batches.
+// This method is faster than full_size(), however it is not suitable for variable length
+// values like strings or complex types
+fn values_fixed_size(values: &ValueSet) -> usize {
+    (std::mem::size_of::<ScalarValue>() * values.capacity())
+        + values
+            .iter()
+            .next()
+            .map(|vals| ScalarValue::size(vals) - std::mem::size_of_val(vals))
+            .unwrap_or(0)
+}
+// calculates the size as accurate as possible, call to this method is expensive

Review Comment:
   ```suggestion
   }
   
   // calculates the size as accurate as possible, call to this method is expensive
   ```
   
   style: add empty line between two fns



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [arrow-datafusion] jaylmiller commented on pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

Posted by "jaylmiller (via GitHub)" <gi...@apache.org>.
jaylmiller commented on PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#issuecomment-1471024770

   I'm not seeing any noticeable ClickBench improvements when changing "RegionID" column to use dict encoding (setting other columns to use dict encoding cause my datafusion process to be killed...) 
   
   Maybe this change is not worth merging if ClickBench improvements aren't being seen


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org