You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by tu...@apache.org on 2023/12/07 16:10:25 UTC

(arrow-rs) branch master updated: Use Total Ordering for Aggregates and Refactor for Better Auto-Vectorization (#5100)

This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new b06ab13fa2 Use Total Ordering for Aggregates and Refactor for Better Auto-Vectorization (#5100)
b06ab13fa2 is described below

commit b06ab13fa2681624c7d5094004309607b253773b
Author: Jörn Horstmann <gi...@jhorstmann.net>
AuthorDate: Thu Dec 7 17:10:20 2023 +0100

    Use Total Ordering for Aggregates and Refactor for Better Auto-Vectorization (#5100)
    
    * Refactor numeric aggregation kernels to make better use of
    auto-vectorization.
    
    Remove the explicit simd implementations since the autovectorized
    versions are faster on average.
    
    The min/max kernels for floating point numbers now use the total order
    relation.
    
    * Comments and cleanup
    
    * Clippy fixes
    
    * Use largest/smallest bit patterns for float MIN/MAX constants, these differ from the canonical NAN bit pattern
    
    * Add test coverage for aggregating large non-null and float inputs
    
    * Add test with negative NaN
    
    * Rename MIN/MAX constants to make it explicit they use the total order relation
---
 arrow-arith/src/aggregate.rs       | 803 +++++++++++++++++--------------------
 arrow-array/src/arithmetic.rs      |  85 +++-
 arrow-buffer/src/buffer/boolean.rs |   1 +
 3 files changed, 454 insertions(+), 435 deletions(-)

diff --git a/arrow-arith/src/aggregate.rs b/arrow-arith/src/aggregate.rs
index 0dabaa50f5..20ff0711d7 100644
--- a/arrow-arith/src/aggregate.rs
+++ b/arrow-arith/src/aggregate.rs
@@ -20,39 +20,317 @@
 use arrow_array::cast::*;
 use arrow_array::iterator::ArrayIter;
 use arrow_array::*;
-use arrow_buffer::ArrowNativeType;
+use arrow_buffer::{ArrowNativeType, NullBuffer};
 use arrow_data::bit_iterator::try_for_each_valid_idx;
 use arrow_schema::ArrowError;
 use arrow_schema::*;
+use std::borrow::BorrowMut;
 use std::ops::{BitAnd, BitOr, BitXor};
 
-/// Generic test for NaN, the optimizer should be able to remove this for integer types.
-#[inline]
-pub(crate) fn is_nan<T: ArrowNativeType + PartialOrd + Copy>(a: T) -> bool {
-    #[allow(clippy::eq_op)]
-    !(a == a)
+/// An accumulator for primitive numeric values.
+trait NumericAccumulator<T: ArrowNativeTypeOp>: Copy + Default {
+    /// Accumulate a non-null value.
+    fn accumulate(&mut self, value: T);
+    /// Accumulate a nullable values.
+    /// If `valid` is false the `value` should not affect the accumulator state.
+    fn accumulate_nullable(&mut self, value: T, valid: bool);
+    /// Merge another accumulator into this accumulator
+    fn merge(&mut self, other: Self);
+    /// Return the aggregated value.
+    fn finish(&mut self) -> T;
 }
 
-/// Returns the minimum value in the array, according to the natural order.
-/// For floating point arrays any NaN values are considered to be greater than any other non-null value
-#[cfg(not(feature = "simd"))]
-pub fn min<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
-    T: ArrowNumericType,
-    T::Native: ArrowNativeType,
-{
-    min_max_helper::<T::Native, _, _>(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b)
+/// Helper for branchlessly selecting either `a` or `b` based on the boolean `m`.
+/// After verifying the generated assembly this can be a simple `if`.
+#[inline(always)]
+fn select<T: Copy>(m: bool, a: T, b: T) -> T {
+    if m {
+        a
+    } else {
+        b
+    }
 }
 
-/// Returns the maximum value in the array, according to the natural order.
-/// For floating point arrays any NaN values are considered to be greater than any other non-null value
-#[cfg(not(feature = "simd"))]
-pub fn max<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
-    T: ArrowNumericType,
-    T::Native: ArrowNativeType,
-{
-    min_max_helper::<T::Native, _, _>(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b)
+#[derive(Clone, Copy)]
+struct SumAccumulator<T: ArrowNativeTypeOp> {
+    sum: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for SumAccumulator<T> {
+    fn default() -> Self {
+        Self { sum: T::ZERO }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for SumAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        self.sum = self.sum.add_wrapping(value);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let sum = self.sum;
+        self.sum = select(valid, sum.add_wrapping(value), sum)
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.sum = self.sum.add_wrapping(other.sum);
+    }
+
+    fn finish(&mut self) -> T {
+        self.sum
+    }
+}
+
+#[derive(Clone, Copy)]
+struct MinAccumulator<T: ArrowNativeTypeOp> {
+    min: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for MinAccumulator<T> {
+    fn default() -> Self {
+        Self {
+            min: T::MAX_TOTAL_ORDER,
+        }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for MinAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        let min = self.min;
+        self.min = select(value.is_lt(min), value, min);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let min = self.min;
+        let is_lt = valid & value.is_lt(min);
+        self.min = select(is_lt, value, min);
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.accumulate(other.min)
+    }
+
+    fn finish(&mut self) -> T {
+        self.min
+    }
+}
+
+#[derive(Clone, Copy)]
+struct MaxAccumulator<T: ArrowNativeTypeOp> {
+    max: T,
+}
+
+impl<T: ArrowNativeTypeOp> Default for MaxAccumulator<T> {
+    fn default() -> Self {
+        Self {
+            max: T::MIN_TOTAL_ORDER,
+        }
+    }
+}
+
+impl<T: ArrowNativeTypeOp> NumericAccumulator<T> for MaxAccumulator<T> {
+    fn accumulate(&mut self, value: T) {
+        let max = self.max;
+        self.max = select(value.is_gt(max), value, max);
+    }
+
+    fn accumulate_nullable(&mut self, value: T, valid: bool) {
+        let max = self.max;
+        let is_gt = value.is_gt(max) & valid;
+        self.max = select(is_gt, value, max);
+    }
+
+    fn merge(&mut self, other: Self) {
+        self.accumulate(other.max)
+    }
+
+    fn finish(&mut self) -> T {
+        self.max
+    }
+}
+
+fn reduce_accumulators<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
+    mut acc: [A; LANES],
+) -> A {
+    assert!(LANES > 0 && LANES.is_power_of_two());
+    let mut len = LANES;
+
+    // attempt at tree reduction, unfortunately llvm does not fully recognize this pattern,
+    // but the generated code is still a little faster than purely sequential reduction for floats.
+    while len >= 2 {
+        let mid = len / 2;
+        let (h, t) = acc[..len].split_at_mut(mid);
+
+        for i in 0..mid {
+            h[i].merge(t[i]);
+        }
+        len /= 2;
+    }
+    acc[0]
+}
+
+#[inline(always)]
+fn aggregate_nonnull_chunk<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
+    acc: &mut [A; LANES],
+    values: &[T; LANES],
+) {
+    for i in 0..LANES {
+        acc[i].accumulate(values[i]);
+    }
+}
+
+#[inline(always)]
+fn aggregate_nullable_chunk<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
+    acc: &mut [A; LANES],
+    values: &[T; LANES],
+    validity: u64,
+) {
+    let mut bit = 1;
+    for i in 0..LANES {
+        acc[i].accumulate_nullable(values[i], (validity & bit) != 0);
+        bit <<= 1;
+    }
+}
+
+fn aggregate_nonnull_simple<T: ArrowNativeTypeOp, A: NumericAccumulator<T>>(values: &[T]) -> T {
+    return values
+        .iter()
+        .copied()
+        .fold(A::default(), |mut a, b| {
+            a.accumulate(b);
+            a
+        })
+        .finish();
+}
+
+#[inline(never)]
+fn aggregate_nonnull_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
+    values: &[T],
+) -> T {
+    // aggregating into multiple independent accumulators allows the compiler to use vector registers
+    // with a single accumulator the compiler would not be allowed to reorder floating point addition
+    let mut acc = [A::default(); LANES];
+    let mut chunks = values.chunks_exact(LANES);
+    chunks.borrow_mut().for_each(|chunk| {
+        aggregate_nonnull_chunk(&mut acc, chunk[..LANES].try_into().unwrap());
+    });
+
+    let remainder = chunks.remainder();
+    for i in 0..remainder.len() {
+        acc[i].accumulate(remainder[i]);
+    }
+
+    reduce_accumulators(acc).finish()
+}
+
+#[inline(never)]
+fn aggregate_nullable_lanes<T: ArrowNativeTypeOp, A: NumericAccumulator<T>, const LANES: usize>(
+    values: &[T],
+    validity: &NullBuffer,
+) -> T {
+    assert!(LANES > 0 && 64 % LANES == 0);
+    assert_eq!(values.len(), validity.len());
+
+    // aggregating into multiple independent accumulators allows the compiler to use vector registers
+    let mut acc = [A::default(); LANES];
+    // we process 64 bits of validity at a time
+    let mut values_chunks = values.chunks_exact(64);
+    let validity_chunks = validity.inner().bit_chunks();
+    let mut validity_chunks_iter = validity_chunks.iter();
+
+    values_chunks.borrow_mut().for_each(|chunk| {
+        // Safety: we asserted that values and validity have the same length and trust the iterator impl
+        let mut validity = unsafe { validity_chunks_iter.next().unwrap_unchecked() };
+        // chunk further based on the number of vector lanes
+        chunk.chunks_exact(LANES).for_each(|chunk| {
+            aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity);
+            validity >>= LANES;
+        });
+    });
+
+    let remainder = values_chunks.remainder();
+    if !remainder.is_empty() {
+        let mut validity = validity_chunks.remainder_bits();
+
+        let mut remainder_chunks = remainder.chunks_exact(LANES);
+        remainder_chunks.borrow_mut().for_each(|chunk| {
+            aggregate_nullable_chunk(&mut acc, chunk[..LANES].try_into().unwrap(), validity);
+            validity >>= LANES;
+        });
+
+        let remainder = remainder_chunks.remainder();
+        if !remainder.is_empty() {
+            let mut bit = 1;
+            for i in 0..remainder.len() {
+                acc[i].accumulate_nullable(remainder[i], (validity & bit) != 0);
+                bit <<= 1;
+            }
+        }
+    }
+
+    reduce_accumulators(acc).finish()
+}
+
+/// The preferred vector size in bytes for the target platform.
+/// Note that the avx512 target feature is still unstable and this also means it is not detected on stable rust.
+const PREFERRED_VECTOR_SIZE: usize =
+    if cfg!(all(target_arch = "x86_64", target_feature = "avx512f")) {
+        64
+    } else if cfg!(all(target_arch = "x86_64", target_feature = "avx")) {
+        32
+    } else {
+        16
+    };
+
+/// non-nullable aggregation requires fewer temporary registers so we can use more of them for accumulators
+const PREFERRED_VECTOR_SIZE_NON_NULL: usize = PREFERRED_VECTOR_SIZE * 2;
+
+/// Generic aggregation for any primitive type.
+/// Returns None if there are no non-null values in `array`.
+fn aggregate<T: ArrowNativeTypeOp, P: ArrowPrimitiveType<Native = T>, A: NumericAccumulator<T>>(
+    array: &PrimitiveArray<P>,
+) -> Option<T> {
+    let null_count = array.null_count();
+    if null_count == array.len() {
+        return None;
+    }
+    let values = array.values().as_ref();
+    match array.nulls() {
+        Some(nulls) if null_count > 0 => {
+            // const generics depending on a generic type parameter are not supported
+            // so we have to match and call aggregate with the corresponding constant
+            match PREFERRED_VECTOR_SIZE / std::mem::size_of::<T>() {
+                64 => Some(aggregate_nullable_lanes::<T, A, 64>(values, nulls)),
+                32 => Some(aggregate_nullable_lanes::<T, A, 32>(values, nulls)),
+                16 => Some(aggregate_nullable_lanes::<T, A, 16>(values, nulls)),
+                8 => Some(aggregate_nullable_lanes::<T, A, 8>(values, nulls)),
+                4 => Some(aggregate_nullable_lanes::<T, A, 4>(values, nulls)),
+                2 => Some(aggregate_nullable_lanes::<T, A, 2>(values, nulls)),
+                _ => Some(aggregate_nullable_lanes::<T, A, 1>(values, nulls)),
+            }
+        }
+        _ => {
+            let is_float = matches!(
+                array.data_type(),
+                DataType::Float16 | DataType::Float32 | DataType::Float64
+            );
+            if is_float {
+                match PREFERRED_VECTOR_SIZE_NON_NULL / std::mem::size_of::<T>() {
+                    64 => Some(aggregate_nonnull_lanes::<T, A, 64>(values)),
+                    32 => Some(aggregate_nonnull_lanes::<T, A, 32>(values)),
+                    16 => Some(aggregate_nonnull_lanes::<T, A, 16>(values)),
+                    8 => Some(aggregate_nonnull_lanes::<T, A, 8>(values)),
+                    4 => Some(aggregate_nonnull_lanes::<T, A, 4>(values)),
+                    2 => Some(aggregate_nonnull_lanes::<T, A, 2>(values)),
+                    _ => Some(aggregate_nonnull_simple::<T, A>(values)),
+                }
+            } else {
+                // for non-null integers its better to not chunk ourselves and instead
+                // let llvm fully handle loop unrolling and vectorization
+                Some(aggregate_nonnull_simple::<T, A>(values))
+            }
+        }
+    }
 }
 
 /// Returns the minimum value in the boolean array.
@@ -230,7 +508,7 @@ where
     T: ArrowNumericType,
     T::Native: ArrowNativeType,
 {
-    min_max_array_helper::<T, A, _, _>(array, |a, b| (is_nan(*a) & !is_nan(*b)) || a > b, min)
+    min_max_array_helper::<T, A, _, _>(array, |a, b| a.is_gt(*b), min)
 }
 
 /// Returns the max of values in the array of `ArrowNumericType` type, or dictionary
@@ -238,9 +516,9 @@ where
 pub fn max_array<T, A: ArrayAccessor<Item = T::Native>>(array: A) -> Option<T::Native>
 where
     T: ArrowNumericType,
-    T::Native: ArrowNativeType,
+    T::Native: ArrowNativeTypeOp,
 {
-    min_max_array_helper::<T, A, _, _>(array, |a, b| (!is_nan(*a) & is_nan(*b)) || a < b, max)
+    min_max_array_helper::<T, A, _, _>(array, |a, b| a.is_lt(*b), max)
 }
 
 fn min_max_array_helper<T, A: ArrayAccessor<Item = T::Native>, F, M>(
@@ -259,66 +537,6 @@ where
     }
 }
 
-/// Returns the sum of values in the primitive array.
-///
-/// Returns `None` if the array is empty or only contains null values.
-///
-/// This doesn't detect overflow. Once overflowing, the result will wrap around.
-/// For an overflow-checking variant, use `sum_checked` instead.
-#[cfg(not(feature = "simd"))]
-pub fn sum<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
-where
-    T: ArrowNumericType,
-    T::Native: ArrowNativeTypeOp,
-{
-    let null_count = array.null_count();
-
-    if null_count == array.len() {
-        return None;
-    }
-
-    let data: &[T::Native] = array.values();
-
-    match array.nulls() {
-        None => {
-            let sum = data.iter().fold(T::default_value(), |accumulator, value| {
-                accumulator.add_wrapping(*value)
-            });
-
-            Some(sum)
-        }
-        Some(nulls) => {
-            let mut sum = T::default_value();
-            let data_chunks = data.chunks_exact(64);
-            let remainder = data_chunks.remainder();
-
-            let bit_chunks = nulls.inner().bit_chunks();
-            data_chunks
-                .zip(bit_chunks.iter())
-                .for_each(|(chunk, mask)| {
-                    // index_mask has value 1 << i in the loop
-                    let mut index_mask = 1;
-                    chunk.iter().for_each(|value| {
-                        if (mask & index_mask) != 0 {
-                            sum = sum.add_wrapping(*value);
-                        }
-                        index_mask <<= 1;
-                    });
-                });
-
-            let remainder_bits = bit_chunks.remainder_bits();
-
-            remainder.iter().enumerate().for_each(|(i, value)| {
-                if remainder_bits & (1 << i) != 0 {
-                    sum = sum.add_wrapping(*value);
-                }
-            });
-
-            Some(sum)
-        }
-    }
-}
-
 macro_rules! bit_operation {
     ($NAME:ident, $OP:ident, $NATIVE:ident, $DEFAULT:expr, $DOC:expr) => {
         #[doc = $DOC]
@@ -476,369 +694,35 @@ where
     }
 }
 
-#[cfg(feature = "simd")]
-mod simd {
-    use super::is_nan;
-    use arrow_array::*;
-    use std::marker::PhantomData;
-
-    pub(super) trait SimdAggregate<T: ArrowNumericType> {
-        type ScalarAccumulator;
-        type SimdAccumulator;
-
-        /// Returns the accumulator for aggregating scalar values
-        fn init_accumulator_scalar() -> Self::ScalarAccumulator;
-
-        /// Returns the accumulator for aggregating simd chunks of values
-        fn init_accumulator_chunk() -> Self::SimdAccumulator;
-
-        /// Updates the accumulator with the values of one chunk
-        fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd);
-
-        /// Updates the accumulator with the values of one chunk according to the given vector mask
-        fn accumulate_chunk_nullable(
-            accumulator: &mut Self::SimdAccumulator,
-            chunk: T::Simd,
-            mask: T::SimdMask,
-        );
-
-        /// Updates the accumulator with one value
-        fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native);
-
-        /// Reduces the vector lanes of the simd accumulator and the scalar accumulator to a single value
-        fn reduce(
-            simd_accumulator: Self::SimdAccumulator,
-            scalar_accumulator: Self::ScalarAccumulator,
-        ) -> Option<T::Native>;
-    }
-
-    pub(super) struct SumAggregate<T: ArrowNumericType> {
-        phantom: PhantomData<T>,
-    }
-
-    impl<T: ArrowNumericType> SimdAggregate<T> for SumAggregate<T>
-    where
-        T::Native: ArrowNativeTypeOp,
-    {
-        type ScalarAccumulator = T::Native;
-        type SimdAccumulator = T::Simd;
-
-        fn init_accumulator_scalar() -> Self::ScalarAccumulator {
-            T::default_value()
-        }
-
-        fn init_accumulator_chunk() -> Self::SimdAccumulator {
-            T::init(Self::init_accumulator_scalar())
-        }
-
-        fn accumulate_chunk_non_null(accumulator: &mut T::Simd, chunk: T::Simd) {
-            *accumulator = *accumulator + chunk;
-        }
-
-        fn accumulate_chunk_nullable(
-            accumulator: &mut T::Simd,
-            chunk: T::Simd,
-            vecmask: T::SimdMask,
-        ) {
-            let zero = T::init(T::default_value());
-            let blended = T::mask_select(vecmask, chunk, zero);
-
-            *accumulator = *accumulator + blended;
-        }
-
-        fn accumulate_scalar(accumulator: &mut T::Native, value: T::Native) {
-            *accumulator = accumulator.add_wrapping(value)
-        }
-
-        fn reduce(
-            simd_accumulator: Self::SimdAccumulator,
-            scalar_accumulator: Self::ScalarAccumulator,
-        ) -> Option<T::Native> {
-            // we can't use T::lanes() as the slice len because it is not const,
-            // instead always reserve the maximum number of lanes
-            let mut tmp = [T::default_value(); 64];
-            let slice = &mut tmp[0..T::lanes()];
-            T::write(simd_accumulator, slice);
-
-            let mut reduced = Self::init_accumulator_scalar();
-            slice
-                .iter()
-                .for_each(|value| Self::accumulate_scalar(&mut reduced, *value));
-
-            Self::accumulate_scalar(&mut reduced, scalar_accumulator);
-
-            // result can not be None because we checked earlier for the null count
-            Some(reduced)
-        }
-    }
-
-    pub(super) struct MinAggregate<T: ArrowNumericType> {
-        phantom: PhantomData<T>,
-    }
-
-    impl<T: ArrowNumericType> SimdAggregate<T> for MinAggregate<T>
-    where
-        T::Native: PartialOrd,
-    {
-        type ScalarAccumulator = (T::Native, bool);
-        type SimdAccumulator = (T::Simd, T::SimdMask);
-
-        fn init_accumulator_scalar() -> Self::ScalarAccumulator {
-            (T::default_value(), false)
-        }
-
-        fn init_accumulator_chunk() -> Self::SimdAccumulator {
-            (T::init(T::default_value()), T::mask_init(false))
-        }
-
-        fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) {
-            let acc_is_nan = !T::eq(accumulator.0, accumulator.0);
-            let is_lt = acc_is_nan | T::lt(chunk, accumulator.0);
-            let first_or_lt = !accumulator.1 | is_lt;
-
-            accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0);
-            accumulator.1 = T::mask_init(true);
-        }
-
-        fn accumulate_chunk_nullable(
-            accumulator: &mut Self::SimdAccumulator,
-            chunk: T::Simd,
-            vecmask: T::SimdMask,
-        ) {
-            let acc_is_nan = !T::eq(accumulator.0, accumulator.0);
-            let is_lt = vecmask & (acc_is_nan | T::lt(chunk, accumulator.0));
-            let first_or_lt = !accumulator.1 | is_lt;
-
-            accumulator.0 = T::mask_select(first_or_lt, chunk, accumulator.0);
-            accumulator.1 |= vecmask;
-        }
-
-        fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) {
-            if !accumulator.1 {
-                accumulator.0 = value;
-            } else {
-                let acc_is_nan = is_nan(accumulator.0);
-                if acc_is_nan || value < accumulator.0 {
-                    accumulator.0 = value
-                }
-            }
-            accumulator.1 = true
-        }
-
-        fn reduce(
-            simd_accumulator: Self::SimdAccumulator,
-            scalar_accumulator: Self::ScalarAccumulator,
-        ) -> Option<T::Native> {
-            // we can't use T::lanes() as the slice len because it is not const,
-            // instead always reserve the maximum number of lanes
-            let mut tmp = [T::default_value(); 64];
-            let slice = &mut tmp[0..T::lanes()];
-            T::write(simd_accumulator.0, slice);
-
-            let mut reduced = Self::init_accumulator_scalar();
-            slice
-                .iter()
-                .enumerate()
-                .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i))
-                .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value));
-
-            if scalar_accumulator.1 {
-                Self::accumulate_scalar(&mut reduced, scalar_accumulator.0);
-            }
-
-            if reduced.1 {
-                Some(reduced.0)
-            } else {
-                None
-            }
-        }
-    }
-
-    pub(super) struct MaxAggregate<T: ArrowNumericType> {
-        phantom: PhantomData<T>,
-    }
-
-    impl<T: ArrowNumericType> SimdAggregate<T> for MaxAggregate<T>
-    where
-        T::Native: PartialOrd,
-    {
-        type ScalarAccumulator = (T::Native, bool);
-        type SimdAccumulator = (T::Simd, T::SimdMask);
-
-        fn init_accumulator_scalar() -> Self::ScalarAccumulator {
-            (T::default_value(), false)
-        }
-
-        fn init_accumulator_chunk() -> Self::SimdAccumulator {
-            (T::init(T::default_value()), T::mask_init(false))
-        }
-
-        fn accumulate_chunk_non_null(accumulator: &mut Self::SimdAccumulator, chunk: T::Simd) {
-            let chunk_is_nan = !T::eq(chunk, chunk);
-            let is_gt = chunk_is_nan | T::gt(chunk, accumulator.0);
-            let first_or_gt = !accumulator.1 | is_gt;
-
-            accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0);
-            accumulator.1 = T::mask_init(true);
-        }
-
-        fn accumulate_chunk_nullable(
-            accumulator: &mut Self::SimdAccumulator,
-            chunk: T::Simd,
-            vecmask: T::SimdMask,
-        ) {
-            let chunk_is_nan = !T::eq(chunk, chunk);
-            let is_gt = vecmask & (chunk_is_nan | T::gt(chunk, accumulator.0));
-            let first_or_gt = !accumulator.1 | is_gt;
-
-            accumulator.0 = T::mask_select(first_or_gt, chunk, accumulator.0);
-            accumulator.1 |= vecmask;
-        }
-
-        fn accumulate_scalar(accumulator: &mut Self::ScalarAccumulator, value: T::Native) {
-            if !accumulator.1 {
-                accumulator.0 = value;
-            } else {
-                let value_is_nan = is_nan(value);
-                if value_is_nan || value > accumulator.0 {
-                    accumulator.0 = value
-                }
-            }
-            accumulator.1 = true;
-        }
-
-        fn reduce(
-            simd_accumulator: Self::SimdAccumulator,
-            scalar_accumulator: Self::ScalarAccumulator,
-        ) -> Option<T::Native> {
-            // we can't use T::lanes() as the slice len because it is not const,
-            // instead always reserve the maximum number of lanes
-            let mut tmp = [T::default_value(); 64];
-            let slice = &mut tmp[0..T::lanes()];
-            T::write(simd_accumulator.0, slice);
-
-            let mut reduced = Self::init_accumulator_scalar();
-            slice
-                .iter()
-                .enumerate()
-                .filter(|(i, _value)| T::mask_get(&simd_accumulator.1, *i))
-                .for_each(|(_i, value)| Self::accumulate_scalar(&mut reduced, *value));
-
-            if scalar_accumulator.1 {
-                Self::accumulate_scalar(&mut reduced, scalar_accumulator.0);
-            }
-
-            if reduced.1 {
-                Some(reduced.0)
-            } else {
-                None
-            }
-        }
-    }
-
-    pub(super) fn simd_aggregation<T: ArrowNumericType, A: SimdAggregate<T>>(
-        array: &PrimitiveArray<T>,
-    ) -> Option<T::Native> {
-        let null_count = array.null_count();
-
-        if null_count == array.len() {
-            return None;
-        }
-
-        let data: &[T::Native] = array.values();
-
-        let mut chunk_acc = A::init_accumulator_chunk();
-        let mut rem_acc = A::init_accumulator_scalar();
-
-        match array.nulls() {
-            None => {
-                let data_chunks = data.chunks_exact(64);
-                let remainder = data_chunks.remainder();
-
-                data_chunks.for_each(|chunk| {
-                    chunk.chunks_exact(T::lanes()).for_each(|chunk| {
-                        let chunk = T::load(&chunk);
-                        A::accumulate_chunk_non_null(&mut chunk_acc, chunk);
-                    });
-                });
-
-                remainder.iter().for_each(|value| {
-                    A::accumulate_scalar(&mut rem_acc, *value);
-                });
-            }
-            Some(nulls) => {
-                // process data in chunks of 64 elements since we also get 64 bits of validity information at a time
-                let data_chunks = data.chunks_exact(64);
-                let remainder = data_chunks.remainder();
-
-                let bit_chunks = nulls.inner().bit_chunks();
-                let remainder_bits = bit_chunks.remainder_bits();
-
-                data_chunks.zip(bit_chunks).for_each(|(chunk, mut mask)| {
-                    // split chunks further into slices corresponding to the vector length
-                    // the compiler is able to unroll this inner loop and remove bounds checks
-                    // since the outer chunk size (64) is always a multiple of the number of lanes
-                    chunk.chunks_exact(T::lanes()).for_each(|chunk| {
-                        let vecmask = T::mask_from_u64(mask);
-                        let chunk = T::load(&chunk);
-
-                        A::accumulate_chunk_nullable(&mut chunk_acc, chunk, vecmask);
-
-                        // skip the shift and avoid overflow for u8 type, which uses 64 lanes.
-                        mask >>= T::lanes() % 64;
-                    });
-                });
-
-                remainder.iter().enumerate().for_each(|(i, value)| {
-                    if remainder_bits & (1 << i) != 0 {
-                        A::accumulate_scalar(&mut rem_acc, *value)
-                    }
-                });
-            }
-        }
-
-        A::reduce(chunk_acc, rem_acc)
-    }
-}
-
 /// Returns the sum of values in the primitive array.
 ///
 /// Returns `None` if the array is empty or only contains null values.
 ///
 /// This doesn't detect overflow in release mode by default. Once overflowing, the result will
 /// wrap around. For an overflow-checking variant, use `sum_checked` instead.
-#[cfg(feature = "simd")]
 pub fn sum<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
 where
     T::Native: ArrowNativeTypeOp,
 {
-    use simd::*;
-
-    simd::simd_aggregation::<T, SumAggregate<T>>(&array)
+    aggregate::<T::Native, T, SumAccumulator<T::Native>>(array)
 }
 
-#[cfg(feature = "simd")]
 /// Returns the minimum value in the array, according to the natural order.
 /// For floating point arrays any NaN values are considered to be greater than any other non-null value
 pub fn min<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
 where
     T::Native: PartialOrd,
 {
-    use simd::*;
-
-    simd::simd_aggregation::<T, MinAggregate<T>>(&array)
+    aggregate::<T::Native, T, MinAccumulator<T::Native>>(array)
 }
 
-#[cfg(feature = "simd")]
 /// Returns the maximum value in the array, according to the natural order.
 /// For floating point arrays any NaN values are considered to be greater than any other non-null value
 pub fn max<T: ArrowNumericType>(array: &PrimitiveArray<T>) -> Option<T::Native>
 where
     T::Native: PartialOrd,
 {
-    use simd::*;
-
-    simd::simd_aggregation::<T, MaxAggregate<T>>(&array)
+    aggregate::<T::Native, T, MaxAccumulator<T::Native>>(array)
 }
 
 #[cfg(test)]
@@ -872,8 +756,41 @@ mod tests {
         assert_eq!(None, sum(&a));
     }
 
+    #[test]
+    fn test_primitive_array_sum_large_float_64() {
+        let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), None);
+        assert_eq!(Some((1..=100).sum::<i64>() as f64), sum(&c));
+
+        // create an array that actually has non-zero values at the invalid indices
+        let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
+        let c = Float64Array::new((1..=100).map(|x| x as f64).collect(), Some(validity));
+
+        assert_eq!(
+            Some((1..=100).filter(|i| i % 3 == 0).sum::<i64>() as f64),
+            sum(&c)
+        );
+    }
+
+    #[test]
+    fn test_primitive_array_sum_large_float_32() {
+        let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), None);
+        assert_eq!(Some((1..=100).sum::<i64>() as f32), sum(&c));
+
+        // create an array that actually has non-zero values at the invalid indices
+        let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
+        let c = Float32Array::new((1..=100).map(|x| x as f32).collect(), Some(validity));
+
+        assert_eq!(
+            Some((1..=100).filter(|i| i % 3 == 0).sum::<i64>() as f32),
+            sum(&c)
+        );
+    }
+
     #[test]
     fn test_primitive_array_sum_large_64() {
+        let c = Int64Array::new((1..=100).collect(), None);
+        assert_eq!(Some((1..=100).sum()), sum(&c));
+
         // create an array that actually has non-zero values at the invalid indices
         let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
         let c = Int64Array::new((1..=100).collect(), Some(validity));
@@ -883,6 +800,9 @@ mod tests {
 
     #[test]
     fn test_primitive_array_sum_large_32() {
+        let c = Int32Array::new((1..=100).collect(), None);
+        assert_eq!(Some((1..=100).sum()), sum(&c));
+
         // create an array that actually has non-zero values at the invalid indices
         let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
         let c = Int32Array::new((1..=100).collect(), Some(validity));
@@ -891,6 +811,9 @@ mod tests {
 
     #[test]
     fn test_primitive_array_sum_large_16() {
+        let c = Int16Array::new((1..=100).collect(), None);
+        assert_eq!(Some((1..=100).sum()), sum(&c));
+
         // create an array that actually has non-zero values at the invalid indices
         let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
         let c = Int16Array::new((1..=100).collect(), Some(validity));
@@ -899,11 +822,23 @@ mod tests {
 
     #[test]
     fn test_primitive_array_sum_large_8() {
-        // include fewer values than other large tests so the result does not overflow the u8
+        let c = UInt8Array::new((1..=100).collect(), None);
+        assert_eq!(
+            Some((1..=100).fold(0_u8, |a, x| a.wrapping_add(x))),
+            sum(&c)
+        );
+
         // create an array that actually has non-zero values at the invalid indices
-        let validity = NullBuffer::new((1..=100).map(|x| x % 33 == 0).collect());
+        let validity = NullBuffer::new((1..=100).map(|x| x % 3 == 0).collect());
         let c = UInt8Array::new((1..=100).collect(), Some(validity));
-        assert_eq!(Some((1..=100).filter(|i| i % 33 == 0).sum()), sum(&c));
+        assert_eq!(
+            Some(
+                (1..=100)
+                    .filter(|i| i % 3 == 0)
+                    .fold(0_u8, |a, x| a.wrapping_add(x))
+            ),
+            sum(&c)
+        );
     }
 
     #[test]
@@ -1103,6 +1038,19 @@ mod tests {
         assert!(min(&a).unwrap().is_nan());
     }
 
+    #[test]
+    fn test_primitive_min_max_float_negative_nan() {
+        let a: Float64Array =
+            Float64Array::from(vec![f64::NEG_INFINITY, f64::NAN, f64::INFINITY, -f64::NAN]);
+        let max = max(&a).unwrap();
+        let min = min(&a).unwrap();
+        assert!(max.is_nan());
+        assert!(max.is_sign_positive());
+
+        assert!(min.is_nan());
+        assert!(min.is_sign_negative());
+    }
+
     #[test]
     fn test_primitive_min_max_float_first_nan_nonnull() {
         let a: Float64Array = (0..100)
@@ -1455,7 +1403,6 @@ mod tests {
     }
 
     #[test]
-    #[cfg(not(feature = "simd"))]
     fn test_sum_overflow() {
         let a = Int32Array::from(vec![i32::MAX, 1]);
 
diff --git a/arrow-array/src/arithmetic.rs b/arrow-array/src/arithmetic.rs
index c9be39d441..5905361903 100644
--- a/arrow-array/src/arithmetic.rs
+++ b/arrow-array/src/arithmetic.rs
@@ -45,6 +45,16 @@ pub trait ArrowNativeTypeOp: ArrowNativeType {
     /// The multiplicative identity
     const ONE: Self;
 
+    /// The minimum value and identity for the `max` aggregation.
+    /// Note that the aggregation uses the total order predicate for floating point values,
+    /// which means that this value is a negative NaN.
+    const MIN_TOTAL_ORDER: Self;
+
+    /// The maximum value and identity for the `min` aggregation.
+    /// Note that the aggregation uses the total order predicate for floating point values,
+    /// which means that this value is a positive NaN.
+    const MAX_TOTAL_ORDER: Self;
+
     /// Checked addition operation
     fn add_checked(self, rhs: Self) -> Result<Self, ArrowError>;
 
@@ -129,12 +139,14 @@ pub trait ArrowNativeTypeOp: ArrowNativeType {
 
 macro_rules! native_type_op {
     ($t:tt) => {
-        native_type_op!($t, 0, 1);
+        native_type_op!($t, 0, 1, $t::MIN, $t::MAX);
     };
-    ($t:tt, $zero:expr, $one: expr) => {
+    ($t:tt, $zero:expr, $one: expr, $min: expr, $max: expr) => {
         impl ArrowNativeTypeOp for $t {
             const ZERO: Self = $zero;
             const ONE: Self = $one;
+            const MIN_TOTAL_ORDER: Self = $min;
+            const MAX_TOTAL_ORDER: Self = $max;
 
             #[inline]
             fn add_checked(self, rhs: Self) -> Result<Self, ArrowError> {
@@ -270,13 +282,15 @@ native_type_op!(u8);
 native_type_op!(u16);
 native_type_op!(u32);
 native_type_op!(u64);
-native_type_op!(i256, i256::ZERO, i256::ONE);
+native_type_op!(i256, i256::ZERO, i256::ONE, i256::MIN, i256::MAX);
 
 macro_rules! native_type_float_op {
-    ($t:tt, $zero:expr, $one:expr) => {
+    ($t:tt, $zero:expr, $one:expr, $min:expr, $max:expr) => {
         impl ArrowNativeTypeOp for $t {
             const ZERO: Self = $zero;
             const ONE: Self = $one;
+            const MIN_TOTAL_ORDER: Self = $min;
+            const MAX_TOTAL_ORDER: Self = $max;
 
             #[inline]
             fn add_checked(self, rhs: Self) -> Result<Self, ArrowError> {
@@ -377,9 +391,30 @@ macro_rules! native_type_float_op {
     };
 }
 
-native_type_float_op!(f16, f16::ZERO, f16::ONE);
-native_type_float_op!(f32, 0., 1.);
-native_type_float_op!(f64, 0., 1.);
+// the smallest/largest bit patterns for floating point numbers are NaN, but differ from the canonical NAN constants.
+// See test_float_total_order_min_max for details.
+native_type_float_op!(
+    f16,
+    f16::ZERO,
+    f16::ONE,
+    f16::from_bits(-1 as _),
+    f16::from_bits(i16::MAX as _)
+);
+// from_bits is not yet stable as const fn, see https://github.com/rust-lang/rust/issues/72447
+native_type_float_op!(
+    f32,
+    0.,
+    1.,
+    unsafe { std::mem::transmute(-1_i32) },
+    unsafe { std::mem::transmute(i32::MAX) }
+);
+native_type_float_op!(
+    f64,
+    0.,
+    1.,
+    unsafe { std::mem::transmute(-1_i64) },
+    unsafe { std::mem::transmute(i64::MAX) }
+);
 
 #[cfg(test)]
 mod tests {
@@ -780,4 +815,40 @@ mod tests {
         assert_eq!(8.0_f32.pow_checked(2_u32).unwrap(), 64_f32);
         assert_eq!(8.0_f64.pow_checked(2_u32).unwrap(), 64_f64);
     }
+
+    #[test]
+    fn test_float_total_order_min_max() {
+        assert!(<f64 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(f64::NEG_INFINITY));
+        assert!(<f64 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f64::INFINITY));
+
+        assert!(<f64 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_nan());
+        assert!(<f64 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_sign_negative());
+        assert!(<f64 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(-f64::NAN));
+
+        assert!(<f64 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_nan());
+        assert!(<f64 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_sign_positive());
+        assert!(<f64 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f64::NAN));
+
+        assert!(<f32 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(f32::NEG_INFINITY));
+        assert!(<f32 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f32::INFINITY));
+
+        assert!(<f32 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_nan());
+        assert!(<f32 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_sign_negative());
+        assert!(<f32 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(-f32::NAN));
+
+        assert!(<f32 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_nan());
+        assert!(<f32 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_sign_positive());
+        assert!(<f32 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f32::NAN));
+
+        assert!(<f16 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(f16::NEG_INFINITY));
+        assert!(<f16 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f16::INFINITY));
+
+        assert!(<f16 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_nan());
+        assert!(<f16 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_sign_negative());
+        assert!(<f16 as ArrowNativeTypeOp>::MIN_TOTAL_ORDER.is_lt(-f16::NAN));
+
+        assert!(<f16 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_nan());
+        assert!(<f16 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_sign_positive());
+        assert!(<f16 as ArrowNativeTypeOp>::MAX_TOTAL_ORDER.is_gt(f16::NAN));
+    }
 }
diff --git a/arrow-buffer/src/buffer/boolean.rs b/arrow-buffer/src/buffer/boolean.rs
index c651edcad9..1589cc5b10 100644
--- a/arrow-buffer/src/buffer/boolean.rs
+++ b/arrow-buffer/src/buffer/boolean.rs
@@ -90,6 +90,7 @@ impl BooleanBuffer {
 
     /// Returns a `BitChunks` instance which can be used to iterate over
     /// this buffer's bits in `u64` chunks
+    #[inline]
     pub fn bit_chunks(&self) -> BitChunks {
         BitChunks::new(self.values(), self.offset, self.len)
     }