You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "jhorstmann (via GitHub)" <gi...@apache.org> on 2023/03/20 13:26:00 UTC

[GitHub] [arrow-datafusion] jhorstmann commented on a diff in pull request #5554: Improve performance of COUNT (distinct x) for dictionary columns #258

jhorstmann commented on code in PR #5554:
URL: https://github.com/apache/arrow-datafusion/pull/5554#discussion_r1142109702


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -158,38 +220,96 @@ impl Accumulator for DistinctCountAccumulator {
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        merge_values(&mut self.values, states)
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
+    }
+
+    fn size(&self) -> usize {
+        let values_size = match &self.state_data_type {
+            DataType::Boolean | DataType::Null => values_fixed_size(&self.values),
+            d if d.is_primitive() => values_fixed_size(&self.values),
+            _ => values_full_size(&self.values),
+        };
+        std::mem::size_of_val(self) + values_size + std::mem::size_of::<DataType>()
+    }
+}
+/// Special case accumulator for counting distinct values in a dict
+struct CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    /// `K` is required when casting to dict array
+    _dt: core::marker::PhantomData<K>,
+    values_datatype: DataType,
+    values: ValueSet,
+}
+
+impl<K> std::fmt::Debug for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CountDistinctDictAccumulator")
+            .field("values", &self.values)
+            .field("values_datatype", &self.values_datatype)
+            .finish()
+    }
+}
+impl<K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync>
+    CountDistinctDictAccumulator<K>
+{
+    fn new(values_datatype: DataType) -> Self {
+        Self {
+            _dt: core::marker::PhantomData,
+            values: Default::default(),
+            values_datatype,
         }
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
+    }
+}
+impl<K> Accumulator for CountDistinctDictAccumulator<K>
+where
+    K: ArrowDictionaryKeyType + std::marker::Send + std::marker::Sync,
+{
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        values_to_state(&self.values, &self.values_datatype)
+    }
 
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return Err(DataFusionError::Internal(
-                    "Unexpected accumulator state".into(),
-                ));
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        if values.is_empty() {
+            return Ok(());
+        }
+        let arr = as_dictionary_array::<K>(&values[0])?;
+        let nvalues = arr.values().len();
+        // map keys to whether their corresponding value has been seen or not
+        let mut seen_map = vec![false; nvalues];

Review Comment:
   `seen_map` could become a bitmap to save space.
   
   High-cardinality inputs relative to the batch size (like `UserID` in clickbench) probably don't benefit that much from this map. I don't know the standard batch size that datafusion uses for that query, but a much larger batch size could improve the performance in this case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org