You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by "comphead (via GitHub)" <gi...@apache.org> on 2023/03/08 17:51:58 UTC

[GitHub] [arrow-datafusion] comphead commented on a diff in pull request #5408: refactor count_distinct to not to have update and merge

comphead commented on code in PR #5408:
URL: https://github.com/apache/arrow-datafusion/pull/5408#discussion_r1129835379


##########
datafusion/physical-expr/src/aggregate/count_distinct.rs:
##########
@@ -159,118 +112,80 @@ impl DistinctCountAccumulator {
                 .values
                 .iter()
                 .next()
-                .map(|vals| {
-                    (ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0))
-                        * self.values.capacity()
-                })
+                .map(|vals| ScalarValue::size(vals) - std::mem::size_of_val(&vals))
                 .unwrap_or(0)
     }
-
-    // calculates the size as accurate as possible, call to this method is expensive
-    fn full_size(&self) -> usize {
-        std::mem::size_of_val(self)
-            + (std::mem::size_of::<DistinctScalarValues>() * self.values.capacity())
-            + self
-                .values
-                .iter()
-                .map(|vals| {
-                    ScalarValue::size_of_vec(&vals.0) - std::mem::size_of_val(&vals.0)
-                })
-                .sum::<usize>()
-            + (std::mem::size_of::<DataType>() * self.state_data_types.capacity())
-            + self
-                .state_data_types
-                .iter()
-                .map(|dt| dt.size() - std::mem::size_of_val(dt))
-                .sum::<usize>()
-            + self.count_data_type.size()
-            - std::mem::size_of_val(&self.count_data_type)
-    }
 }
 
 impl Accumulator for DistinctCountAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        let mut cols_out =
+            ScalarValue::new_list(Some(Vec::new()), self.state_data_type.clone());
+        self.values
+            .iter()
+            .enumerate()
+            .for_each(|(_, distinct_values)| {
+                if let ScalarValue::List(Some(ref mut v), _) = cols_out {
+                    v.push(distinct_values.clone());
+                }
+            });
+        Ok(vec![cols_out])
+    }
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
         if values.is_empty() {
             return Ok(());
         }
-        (0..values[0].len()).try_for_each(|index| {
-            let v = values
-                .iter()
-                .map(|array| ScalarValue::try_from_array(array, index))
-                .collect::<Result<Vec<_>>>()?;
-            self.update(&v)
+        let arr = &values[0];
+        (0..arr.len()).try_for_each(|index| {
+            if !arr.is_null(index) {
+                let scalar = ScalarValue::try_from_array(arr, index)?;
+                self.values.insert(scalar);
+            }
+            Ok(())
         })
     }
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
         if states.is_empty() {
             return Ok(());
         }
-        (0..states[0].len()).try_for_each(|index| {
-            let v = states
-                .iter()
-                .map(|array| ScalarValue::try_from_array(array, index))
-                .collect::<Result<Vec<_>>>()?;
-            self.merge(&v)
+        let arr = &states[0];
+        (0..arr.len()).try_for_each(|index| {
+            let scalar = ScalarValue::try_from_array(arr, index)?;
+
+            if let ScalarValue::List(Some(scalar), _) = scalar {
+                scalar.iter().for_each(|scalar| {
+                    if !ScalarValue::is_null(scalar) {
+                        self.values.insert(scalar.clone());
+                    }
+                });
+            } else {
+                return Err(DataFusionError::Internal(
+                    "Unexpected accumulator state".into(),
+                ));
+            }
+            Ok(())
         })
     }
-    fn state(&self) -> Result<Vec<ScalarValue>> {
-        let mut cols_out = self
-            .state_data_types
-            .iter()
-            .map(|state_data_type| {
-                ScalarValue::new_list(Some(Vec::new()), state_data_type.clone())
-            })
-            .collect::<Vec<_>>();
-
-        let mut cols_vec = cols_out
-            .iter_mut()
-            .map(|c| match c {
-                ScalarValue::List(Some(ref mut v), _) => Ok(v),
-                t => Err(DataFusionError::Internal(format!(
-                    "cols_out should only consist of ScalarValue::List. {t:?} is found"
-                ))),
-            })
-            .collect::<Result<Vec<_>>>()?;
-
-        self.values.iter().for_each(|distinct_values| {
-            distinct_values.0.iter().enumerate().for_each(
-                |(col_index, distinct_value)| {
-                    cols_vec[col_index].push(distinct_value.clone());
-                },
-            )
-        });
-
-        Ok(cols_out.into_iter().collect())
-    }
 
     fn evaluate(&self) -> Result<ScalarValue> {
-        match &self.count_data_type {
-            DataType::Int64 => Ok(ScalarValue::Int64(Some(self.values.len() as i64))),
-            t => Err(DataFusionError::Internal(format!(
-                "Invalid data type {t:?} for count distinct aggregation"
-            ))),
-        }
+        Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
     }
 
     fn size(&self) -> usize {
-        if self.count_data_type.is_primitive() {

Review Comment:
   I think we lost the size support for variable sized values during this PR, is it expected? @alamb 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@arrow.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org