You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by dh...@apache.org on 2021/08/12 18:37:49 UTC

[arrow-datafusion] branch master updated: Rework GroupByHash to for faster performance and support grouping by nulls (#808)

This is an automated email from the ASF dual-hosted git repository.

dheres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new fa3f099  Rework GroupByHash to for faster performance and support grouping by nulls (#808)
fa3f099 is described below

commit fa3f0998dfe2dfedb0ab67665329e177e000fc64
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Thu Aug 12 14:37:44 2021 -0400

    Rework GroupByHash to for faster performance and support grouping by nulls (#808)
    
    * Implement faster GroupByHash design
    
    * Rewrite to use RawMap per Dandandan suggestion
    
    * remove stub
    
    * Return error with create_accumulators
    
    * Do not memoize group key creation
---
 datafusion/Cargo.toml                          |   2 +-
 datafusion/src/physical_plan/hash_aggregate.rs | 458 +++++++------------------
 2 files changed, 130 insertions(+), 330 deletions(-)

diff --git a/datafusion/Cargo.toml b/datafusion/Cargo.toml
index 2716cc7..286be8a 100644
--- a/datafusion/Cargo.toml
+++ b/datafusion/Cargo.toml
@@ -48,7 +48,7 @@ force_hash_collisions = []
 
 [dependencies]
 ahash = "0.7"
-hashbrown = "0.11"
+hashbrown = { version = "0.11", features = ["raw"] }
 arrow = { version = "5.1", features = ["prettyprint"] }
 parquet = { version = "5.1", features = ["arrow"] }
 sqlparser = "0.9.0"
diff --git a/datafusion/src/physical_plan/hash_aggregate.rs b/datafusion/src/physical_plan/hash_aggregate.rs
index 5c3c576..1c07f61 100644
--- a/datafusion/src/physical_plan/hash_aggregate.rs
+++ b/datafusion/src/physical_plan/hash_aggregate.rs
@@ -27,44 +27,28 @@ use futures::{
     stream::{Stream, StreamExt},
     Future,
 };
+use hashbrown::HashMap;
 
 use crate::error::{DataFusionError, Result};
+use crate::physical_plan::hash_utils::create_hashes;
 use crate::physical_plan::{
     Accumulator, AggregateExpr, DisplayFormatType, Distribution, ExecutionPlan,
     Partitioning, PhysicalExpr, SQLMetric,
 };
 use crate::scalar::ScalarValue;
 
+use arrow::{array::ArrayRef, compute, compute::cast};
 use arrow::{
     array::{Array, UInt32Builder},
     error::{ArrowError, Result as ArrowResult},
 };
 use arrow::{
-    array::{
-        ArrayRef, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array,
-        Int8Array, StringArray, UInt16Array, UInt32Array, UInt64Array, UInt8Array,
-    },
-    compute,
-};
-use arrow::{
-    array::{BooleanArray, Date32Array, DictionaryArray},
-    compute::cast,
-    datatypes::{
-        ArrowDictionaryKeyType, ArrowNativeType, Int16Type, Int32Type, Int64Type,
-        Int8Type, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
-    },
-};
-use arrow::{
-    datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit},
+    datatypes::{Field, Schema, SchemaRef},
     record_batch::RecordBatch,
 };
-use hashbrown::HashMap;
+use hashbrown::raw::RawTable;
 use pin_project_lite::pin_project;
 
-use arrow::array::{
-    LargeStringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
-    TimestampNanosecondArray,
-};
 use async_trait::async_trait;
 
 use super::{expressions::Column, RecordBatchStream, SendableRecordBatchStream};
@@ -339,6 +323,7 @@ pin_project! {
 
 fn group_aggregate_batch(
     mode: &AggregateMode,
+    random_state: &RandomState,
     group_expr: &[Arc<dyn PhysicalExpr>],
     aggr_expr: &[Arc<dyn AggregateExpr>],
     batch: RecordBatch,
@@ -361,57 +346,74 @@ fn group_aggregate_batch(
         group_by_values.push(ScalarValue::UInt32(Some(0)));
     }
 
-    let mut group_by_values = group_by_values.into_boxed_slice();
-
-    let mut key = Vec::with_capacity(group_values.len());
-
     // 1.1 construct the key from the group values
     // 1.2 construct the mapping key if it does not exist
     // 1.3 add the row' index to `indices`
 
-    // Make sure we can create the accumulators or otherwise return an error
-    create_accumulators(aggr_expr).map_err(DataFusionError::into_arrow_external_error)?;
+    // track which entries in `accumulators` have rows in this batch to aggregate
+    let mut groups_with_rows = vec![];
 
-    // Keys received in this batch
-    let mut batch_keys = vec![];
+    // 1.1 Calculate the group keys for the group values
+    let mut batch_hashes = vec![0; batch.num_rows()];
+    create_hashes(&group_values, random_state, &mut batch_hashes)?;
 
-    for row in 0..batch.num_rows() {
-        // 1.1
-        create_key(&group_values, row, &mut key)
-            .map_err(DataFusionError::into_arrow_external_error)?;
+    for (row, hash) in batch_hashes.into_iter().enumerate() {
+        let Accumulators { map, group_states } = &mut accumulators;
 
-        accumulators
-            .raw_entry_mut()
-            .from_key(&key)
-            // 1.3
-            .and_modify(|_, (_, _, v)| {
-                if v.is_empty() {
-                    batch_keys.push(key.clone())
+        let entry = map.get_mut(hash, |(_hash, group_idx)| {
+            // verify that a group that we are inserting with hash is
+            // actually the same key value as the group in
+            // existing_idx  (aka group_values @ row)
+            let group_state = &group_states[*group_idx];
+            group_values
+                .iter()
+                .zip(group_state.group_by_values.iter())
+                .all(|(array, scalar)| scalar.eq_array(array, row))
+        });
+
+        match entry {
+            // Existing entry for this group value
+            Some((_hash, group_idx)) => {
+                let group_state = &mut group_states[*group_idx];
+                // 1.3
+                if group_state.indices.is_empty() {
+                    groups_with_rows.push(*group_idx);
                 };
-                v.push(row as u32)
-            })
-            // 1.2
-            .or_insert_with(|| {
-                // We can safely unwrap here as we checked we can create an accumulator before
-                let accumulator_set = create_accumulators(aggr_expr).unwrap();
-                batch_keys.push(key.clone());
-                // Note it would be nice to make this a real error (rather than panic)
-                // but it is better than silently ignoring the issue and getting wrong results
-                create_group_by_values(&group_values, row, &mut group_by_values)
-                    .expect("can not create group by value");
-                (
-                    key.clone(),
-                    (group_by_values.clone(), accumulator_set, vec![row as u32]),
-                )
-            });
+                group_state.indices.push(row as u32); // remember this row
+            }
+            //  1.2 Need to create new entry
+            None => {
+                let accumulator_set = create_accumulators(aggr_expr)
+                    .map_err(DataFusionError::into_arrow_external_error)?;
+
+                // Copy group values out of arrays into `ScalarValue`s
+                let group_by_values = group_values
+                    .iter()
+                    .map(|col| ScalarValue::try_from_array(col, row))
+                    .collect::<Result<Vec<_>>>()?;
+
+                // Add new entry to group_states and save newly created index
+                let group_state = GroupState {
+                    group_by_values: group_by_values.into_boxed_slice(),
+                    accumulator_set,
+                    indices: vec![row as u32], // 1.3
+                };
+                let group_idx = group_states.len();
+                group_states.push(group_state);
+                groups_with_rows.push(group_idx);
+
+                // for hasher function, use precomputed hash value
+                map.insert(hash, (hash, group_idx), |(hash, _group_idx)| *hash);
+            }
+        };
     }
 
     // Collect all indices + offsets based on keys in this vec
     let mut batch_indices: UInt32Builder = UInt32Builder::new(0);
     let mut offsets = vec![0];
     let mut offset_so_far = 0;
-    for key in batch_keys.iter() {
-        let (_, _, indices) = accumulators.get_mut(key).unwrap();
+    for group_idx in groups_with_rows.iter() {
+        let indices = &accumulators.group_states[*group_idx].indices;
         batch_indices.append_slice(indices)?;
         offset_so_far += indices.len();
         offsets.push(offset_so_far);
@@ -442,13 +444,14 @@ fn group_aggregate_batch(
     // 2.3 `slice` from each of its arrays the keys' values
     // 2.4 update / merge the accumulator with the values
     // 2.5 clear indices
-    batch_keys
-        .iter_mut()
+    groups_with_rows
+        .iter()
         .zip(offsets.windows(2))
-        .try_for_each(|(key, offsets)| {
-            let (_, accumulator_set, indices) = accumulators.get_mut(key).unwrap();
+        .try_for_each(|(group_idx, offsets)| {
+            let group_state = &mut accumulators.group_states[*group_idx];
             // 2.2
-            accumulator_set
+            group_state
+                .accumulator_set
                 .iter_mut()
                 .zip(values.iter())
                 .map(|(accumulator, aggr_array)| {
@@ -472,238 +475,12 @@ fn group_aggregate_batch(
                 })
                 // 2.5
                 .and({
-                    indices.clear();
+                    group_state.indices.clear();
                     Ok(())
                 })
         })?;
-    Ok(accumulators)
-}
 
-/// Appends a sequence of [u8] bytes for the value in `col[row]` to
-/// `vec` to be used as a key into the hash map for a dictionary type
-///
-/// Note that ideally, for dictionary encoded columns, we would be
-/// able to simply use the dictionary idicies themselves (no need to
-/// look up values) or possibly simply build the hash table entirely
-/// on the dictionary indexes.
-///
-/// This aproach would likely work (very) well for the common case,
-/// but it also has to to handle the case where the dictionary itself
-/// is not the same across all record batches (and thus indexes in one
-/// record batch may not correspond to the same index in another)
-fn dictionary_create_key_for_col<K: ArrowDictionaryKeyType>(
-    col: &ArrayRef,
-    row: usize,
-    vec: &mut Vec<u8>,
-) -> Result<()> {
-    let dict_col = col.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
-
-    // look up the index in the values dictionary
-    let keys_col = dict_col.keys();
-    let values_index = keys_col.value(row).to_usize().ok_or_else(|| {
-        DataFusionError::Internal(format!(
-            "Can not convert index to usize in dictionary of type creating group by value {:?}",
-            keys_col.data_type()
-        ))
-    })?;
-
-    create_key_for_col(dict_col.values(), values_index, vec)
-}
-
-/// Appends a sequence of [u8] bytes for the value in `col[row]` to
-/// `vec` to be used as a key into the hash map.
-///
-/// NOTE: This function does not check col.is_valid(). Caller must do so
-fn create_key_for_col(col: &ArrayRef, row: usize, vec: &mut Vec<u8>) -> Result<()> {
-    match col.data_type() {
-        DataType::Boolean => {
-            let array = col.as_any().downcast_ref::<BooleanArray>().unwrap();
-            vec.extend_from_slice(&[array.value(row) as u8]);
-        }
-        DataType::Float32 => {
-            let array = col.as_any().downcast_ref::<Float32Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Float64 => {
-            let array = col.as_any().downcast_ref::<Float64Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::UInt8 => {
-            let array = col.as_any().downcast_ref::<UInt8Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::UInt16 => {
-            let array = col.as_any().downcast_ref::<UInt16Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::UInt32 => {
-            let array = col.as_any().downcast_ref::<UInt32Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::UInt64 => {
-            let array = col.as_any().downcast_ref::<UInt64Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Int8 => {
-            let array = col.as_any().downcast_ref::<Int8Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Int16 => {
-            let array = col.as_any().downcast_ref::<Int16Array>().unwrap();
-            vec.extend(array.value(row).to_le_bytes().iter());
-        }
-        DataType::Int32 => {
-            let array = col.as_any().downcast_ref::<Int32Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Int64 => {
-            let array = col.as_any().downcast_ref::<Int64Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Timestamp(TimeUnit::Millisecond, None) => {
-            let array = col
-                .as_any()
-                .downcast_ref::<TimestampMillisecondArray>()
-                .unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Timestamp(TimeUnit::Microsecond, None) => {
-            let array = col
-                .as_any()
-                .downcast_ref::<TimestampMicrosecondArray>()
-                .unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Timestamp(TimeUnit::Nanosecond, None) => {
-            let array = col
-                .as_any()
-                .downcast_ref::<TimestampNanosecondArray>()
-                .unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Utf8 => {
-            let array = col.as_any().downcast_ref::<StringArray>().unwrap();
-            let value = array.value(row);
-            // store the size
-            vec.extend_from_slice(&value.len().to_le_bytes());
-            // store the string value
-            vec.extend_from_slice(value.as_bytes());
-        }
-        DataType::LargeUtf8 => {
-            let array = col.as_any().downcast_ref::<LargeStringArray>().unwrap();
-            let value = array.value(row);
-            // store the size
-            vec.extend_from_slice(&value.len().to_le_bytes());
-            // store the string value
-            vec.extend_from_slice(value.as_bytes());
-        }
-        DataType::Date32 => {
-            let array = col.as_any().downcast_ref::<Date32Array>().unwrap();
-            vec.extend_from_slice(&array.value(row).to_le_bytes());
-        }
-        DataType::Dictionary(index_type, _) => match **index_type {
-            DataType::Int8 => {
-                dictionary_create_key_for_col::<Int8Type>(col, row, vec)?;
-            }
-            DataType::Int16 => {
-                dictionary_create_key_for_col::<Int16Type>(col, row, vec)?;
-            }
-            DataType::Int32 => {
-                dictionary_create_key_for_col::<Int32Type>(col, row, vec)?;
-            }
-            DataType::Int64 => {
-                dictionary_create_key_for_col::<Int64Type>(col, row, vec)?;
-            }
-            DataType::UInt8 => {
-                dictionary_create_key_for_col::<UInt8Type>(col, row, vec)?;
-            }
-            DataType::UInt16 => {
-                dictionary_create_key_for_col::<UInt16Type>(col, row, vec)?;
-            }
-            DataType::UInt32 => {
-                dictionary_create_key_for_col::<UInt32Type>(col, row, vec)?;
-            }
-            DataType::UInt64 => {
-                dictionary_create_key_for_col::<UInt64Type>(col, row, vec)?;
-            }
-            _ => {
-                return Err(DataFusionError::Internal(format!(
-                "Unsupported GROUP BY type (dictionary index type not supported creating key) {}",
-                col.data_type(),
-            )))
-            }
-        },
-        _ => {
-            // This is internal because we should have caught this before.
-            return Err(DataFusionError::Internal(format!(
-                "Unsupported GROUP BY type creating key {}",
-                col.data_type(),
-            )));
-        }
-    }
-    Ok(())
-}
-
-/// Create a key `Vec<u8>` that is used as key for the hashmap
-///
-/// This looks like
-/// [null_byte][col_value_bytes][null_byte][col_value_bytes]
-///
-/// Note that relatively uncommon patterns (e.g. not 0x00) are chosen
-/// for the null_byte to make debugging easier. The actual values are
-/// arbitrary.
-///
-/// For a NULL value in a column, the key looks like
-/// [0xFE]
-///
-/// For a Non-NULL value in a column, this looks like:
-/// [0xFF][byte representation of column value]
-///
-/// Example of a key with no NULL values:
-/// ```text
-///                        0xFF byte at the start of each column
-///                           signifies the value is non-null
-///                                          │
-///
-///                      ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ┐
-///
-///                      │        string len                 │  0x1234
-/// {                    ▼       (as usize le)      "foo"    ▼(as u16 le)
-///   k1: "foo"        ╔ ═┌──┬──┬──┬──┬──┬──┬──┬──┬──┬──┬──╦ ═┌──┬──┐
-///   k2: 0x1234u16     FF║03│00│00│00│00│00│00│00│"f│"o│"o│FF║34│12│
-/// }                  ╚ ═└──┴──┴──┴──┴──┴──┴──┴──┴──┴──┴──╩ ═└──┴──┘
-///                     0  1  2  3  4  5  6  7  8  9  10 11 12 13 14
-/// ```
-///
-///  Example of a key with NULL values:
-///
-///```text
-///                         0xFE byte at the start of k1 column
-///                     ┌ ─     signifies the value is NULL
-///
-///                     └ ┐
-///                              0x1234
-/// {                     ▼    (as u16 le)
-///   k1: NULL          ╔ ═╔ ═┌──┬──┐
-///   k2: 0x1234u16      FE║FF║12│34│
-/// }                   ╚ ═╚ ═└──┴──┘
-///                       0  1  2  3
-///```
-pub(crate) fn create_key(
-    group_by_keys: &[ArrayRef],
-    row: usize,
-    vec: &mut Vec<u8>,
-) -> Result<()> {
-    vec.clear();
-    for col in group_by_keys {
-        if !col.is_valid(row) {
-            vec.push(0xFE);
-        } else {
-            vec.push(0xFF);
-            create_key_for_col(col, row, vec)?
-        }
-    }
-    Ok(())
+    Ok(accumulators)
 }
 
 async fn compute_grouped_hash_aggregate(
@@ -720,11 +497,7 @@ async fn compute_grouped_hash_aggregate(
         aggregate_expressions(&aggr_expr, &mode, group_expr.len())
             .map_err(DataFusionError::into_arrow_external_error)?;
 
-    // mapping key -> (set of accumulators, indices of the key in the batch)
-    // * the indexes are updated at each row
-    // * the accumulators are updated at the end of each batch
-    // * the indexes are `clear`ed at the end of each batch
-    //let mut accumulators: Accumulators = FnvHashMap::default();
+    let random_state = RandomState::new();
 
     // iterate over all input batches and update the accumulators
     let mut accumulators = Accumulators::default();
@@ -732,6 +505,7 @@ async fn compute_grouped_hash_aggregate(
         let batch = batch?;
         accumulators = group_aggregate_batch(
             &mode,
+            &random_state,
             &group_expr,
             &aggr_expr,
             batch,
@@ -779,8 +553,47 @@ impl GroupedHashAggregateStream {
 }
 
 type AccumulatorItem = Box<dyn Accumulator>;
-type Accumulators =
-    HashMap<Vec<u8>, (Box<[ScalarValue]>, Vec<AccumulatorItem>, Vec<u32>), RandomState>;
+
+/// The state that is built for each output group.
+#[derive(Debug)]
+struct GroupState {
+    /// The actual group by values, one for each group column
+    group_by_values: Box<[ScalarValue]>,
+
+    // Accumulator state, one for each aggregate
+    accumulator_set: Vec<AccumulatorItem>,
+
+    /// scratch space used to collect indices for input rows in a
+    /// bach that have values to aggregate. Reset on each batch
+    indices: Vec<u32>,
+}
+
+/// The state of all the groups
+#[derive(Default)]
+struct Accumulators {
+    /// Logically maps group values to an index in `group_states`
+    ///
+    /// Uses the raw API of hashbrown to avoid actually storing the
+    /// keys in the table
+    ///
+    /// keys: u64 hashes of the GroupValue
+    /// values: (hash, index into `group_states`)
+    map: RawTable<(u64, usize)>,
+
+    /// State for each group
+    group_states: Vec<GroupState>,
+}
+
+impl std::fmt::Debug for Accumulators {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        // hashes are not store inline, so could only get values
+        let map_string = "RawTable";
+        f.debug_struct("Accumulators")
+            .field("map", &map_string)
+            .field("group_states", &self.group_states)
+            .finish()
+    }
+}
 
 impl Stream for GroupedHashAggregateStream {
     type Item = ArrowResult<RecordBatch>;
@@ -903,6 +716,7 @@ pin_project! {
     }
 }
 
+/// Special case aggregate with no groups
 async fn compute_hash_aggregate(
     mode: AggregateMode,
     schema: SchemaRef,
@@ -1031,10 +845,10 @@ fn create_batch_from_map(
     num_group_expr: usize,
     output_schema: &Schema,
 ) -> ArrowResult<RecordBatch> {
-    if accumulators.is_empty() {
+    if accumulators.group_states.is_empty() {
         return Ok(RecordBatch::new_empty(Arc::new(output_schema.to_owned())));
     }
-    let (_, (_, accs, _)) = accumulators.iter().next().unwrap();
+    let accs = &accumulators.group_states[0].accumulator_set;
     let mut acc_data_types: Vec<usize> = vec![];
 
     // Calculate number/shape of state arrays
@@ -1056,8 +870,9 @@ fn create_batch_from_map(
         .map(|i| {
             ScalarValue::iter_to_array(
                 accumulators
-                    .into_iter()
-                    .map(|(_, (group_by_values, _, _))| group_by_values[i].clone()),
+                    .group_states
+                    .iter()
+                    .map(|group_state| group_state.group_by_values[i].clone()),
             )
         })
         .collect::<Result<Vec<_>>>()
@@ -1068,20 +883,22 @@ fn create_batch_from_map(
         for y in 0..state_len {
             match mode {
                 AggregateMode::Partial => {
-                    let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
-                        |(_, (_, accumulator, _))| {
-                            let x = accumulator[x].state().unwrap();
+                    let res = ScalarValue::iter_to_array(
+                        accumulators.group_states.iter().map(|group_state| {
+                            let x = group_state.accumulator_set[x].state().unwrap();
                             x[y].clone()
-                        },
-                    ))
+                        }),
+                    )
                     .map_err(DataFusionError::into_arrow_external_error)?;
 
                     columns.push(res);
                 }
                 AggregateMode::Final | AggregateMode::FinalPartitioned => {
-                    let res = ScalarValue::iter_to_array(accumulators.into_iter().map(
-                        |(_, (_, accumulator, _))| accumulator[x].evaluate().unwrap(),
-                    ))
+                    let res = ScalarValue::iter_to_array(
+                        accumulators.group_states.iter().map(|group_state| {
+                            group_state.accumulator_set[x].evaluate().unwrap()
+                        }),
+                    )
                     .map_err(DataFusionError::into_arrow_external_error)?;
                     columns.push(res);
                 }
@@ -1140,28 +957,11 @@ fn finalize_aggregation(
     }
 }
 
-/// Extract the value in `col[row]` as a GroupByScalar
-fn create_group_by_value(col: &ArrayRef, row: usize) -> Result<ScalarValue> {
-    ScalarValue::try_from_array(col, row)
-}
-
-/// Extract the values in `group_by_keys` arrow arrays into the target vector
-/// as GroupByScalar values
-pub(crate) fn create_group_by_values(
-    group_by_keys: &[ArrayRef],
-    row: usize,
-    vec: &mut Box<[ScalarValue]>,
-) -> Result<()> {
-    for (i, col) in group_by_keys.iter().enumerate() {
-        vec[i] = create_group_by_value(col, row)?
-    }
-    Ok(())
-}
-
 #[cfg(test)]
 mod tests {
 
-    use arrow::array::Float64Array;
+    use arrow::array::{Float64Array, UInt32Array};
+    use arrow::datatypes::DataType;
 
     use super::*;
     use crate::physical_plan::expressions::{col, Avg};