You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/25 05:04:12 UTC

[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

jorgecarleitao commented on a change in pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#discussion_r460365103



##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,139 +17,443 @@
 
 //! Defines miscellaneous array kernels.
 
-use std::sync::Arc;
-
 use crate::array::*;
 use crate::datatypes::{ArrowNumericType, DataType, TimeUnit};
 use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+use crate::{
+    bitmap::Bitmap,
+    buffer::{Buffer, MutableBuffer},
+    memory,
+    util::bit_util,
+};
+use std::{mem, sync::Arc};
 
-/// Helper function to perform boolean lambda function on values from two arrays.
-fn bool_op<T, F>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-    op: F,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-    F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform math operation on arrays of different length".to_string(),
-        ));
+/// trait for copying filtered null bitmap bits
+trait CopyNullBit {
+    fn copy_null_bit(&mut self, source_index: usize);
+    fn null_count(&self) -> usize;
+    fn null_buffer(&mut self) -> Buffer;
+}
+
+/// no-op null bitmap copy implementation,
+/// used when the filtered data array doesn't have a null bitmap
+struct NullBitNoop {}
+
+impl NullBitNoop {
+    fn new() -> Self {
+        NullBitNoop {}
     }
-    let mut b = BooleanArray::builder(left.len());
-    for i in 0..left.len() {
-        let index = i;
-        let l = if left.is_null(i) {
-            None
-        } else {
-            Some(left.value(index))
-        };
-        let r = if right.is_null(i) {
-            None
-        } else {
-            Some(right.value(index))
-        };
-        b.append_value(op(l, r))?;
+}
+
+impl CopyNullBit for NullBitNoop {
+    #[inline]
+    fn copy_null_bit(&mut self, _source_index: usize) {
+        // do nothing
+    }
+
+    fn null_count(&self) -> usize {
+        0
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        Buffer::from([0u8; 0])
     }
-    Ok(b.finish())
 }
 
-macro_rules! filter_array {
-    ($array:expr, $filter:expr, $array_type:ident) => {{
-        let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
-        let mut builder = $array_type::builder(b.len());
-        for i in 0..b.len() {
-            if $filter.value(i) {
-                if b.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(b.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
+/// null bitmap copy implementation,
+/// used when the filtered data array has a null bitmap
+struct NullBitSetter<'a> {
+    target_buffer: MutableBuffer,
+    source_bytes: &'a [u8],
+    target_index: usize,
+    null_count: usize,
 }
 
-/// Returns the array, taking only the elements matching the filter
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
-    match array.data_type() {
-        DataType::UInt8 => filter_array!(array, filter, UInt8Array),
-        DataType::UInt16 => filter_array!(array, filter, UInt16Array),
-        DataType::UInt32 => filter_array!(array, filter, UInt32Array),
-        DataType::UInt64 => filter_array!(array, filter, UInt64Array),
-        DataType::Int8 => filter_array!(array, filter, Int8Array),
-        DataType::Int16 => filter_array!(array, filter, Int16Array),
-        DataType::Int32 => filter_array!(array, filter, Int32Array),
-        DataType::Int64 => filter_array!(array, filter, Int64Array),
-        DataType::Float32 => filter_array!(array, filter, Float32Array),
-        DataType::Float64 => filter_array!(array, filter, Float64Array),
-        DataType::Boolean => filter_array!(array, filter, BooleanArray),
-        DataType::Date32(_) => filter_array!(array, filter, Date32Array),
-        DataType::Date64(_) => filter_array!(array, filter, Date64Array),
-        DataType::Time32(TimeUnit::Second) => {
-            filter_array!(array, filter, Time32SecondArray)
-        }
-        DataType::Time32(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, Time32MillisecondArray)
-        }
-        DataType::Time64(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, Time64MicrosecondArray)
-        }
-        DataType::Time64(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, Time64NanosecondArray)
-        }
-        DataType::Duration(TimeUnit::Second) => {
-            filter_array!(array, filter, DurationSecondArray)
-        }
-        DataType::Duration(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, DurationMillisecondArray)
+impl<'a> NullBitSetter<'a> {
+    fn new(null_bitmap: &'a Bitmap) -> Self {
+        let null_bytes = null_bitmap.buffer_ref().data();
+        // create null bitmap buffer with same length and initialize null bitmap buffer to 1s
+        let null_buffer =
+            MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true);
+        NullBitSetter {
+            source_bytes: null_bytes,
+            target_buffer: null_buffer,
+            target_index: 0,
+            null_count: 0,
         }
-        DataType::Duration(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, DurationMicrosecondArray)
-        }
-        DataType::Duration(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, DurationNanosecondArray)
-        }
-        DataType::Timestamp(TimeUnit::Second, _) => {
-            filter_array!(array, filter, TimestampSecondArray)
+    }
+}
+
+impl<'a> CopyNullBit for NullBitSetter<'a> {
+    #[inline]
+    fn copy_null_bit(&mut self, source_index: usize) {
+        if !bit_util::get_bit(self.source_bytes, source_index) {
+            // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len()
+            unsafe {
+                bit_util::unset_bit_raw(
+                    self.target_buffer.raw_data_mut(),
+                    self.target_index,
+                );
+            }
+            self.null_count += 1;
         }
-        DataType::Timestamp(TimeUnit::Millisecond, _) => {
-            filter_array!(array, filter, TimestampMillisecondArray)
+        self.target_index += 1;
+    }
+
+    fn null_count(&self) -> usize {
+        self.null_count
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        self.target_buffer.resize(self.target_index).unwrap();
+        // use mem::replace to detach self.target_buffer from self so that it can be returned
+        let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0));
+        target_buffer.freeze()
+    }
+}
+
+fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit + 'a> {
+    if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
+        // only return an actual null bit copy implementation if null_bitmap is set
+        Box::new(NullBitSetter::new(null_bitmap))
+    } else {
+        // otherwise return a no-op copy null bit implementation
+        // for improved performance when the filtered array doesn't contain NULLs
+        Box::new(NullBitNoop::new())
+    }
+}
+
+// transmute filter array to u64
+// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits
+// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap
+fn filter_array_impl(
+    filter_context: &FilterContext,
+    data_array: &impl Array,
+    array_type: DataType,
+    value_size: usize,
+) -> Result<ArrayDataBuilder> {
+    if filter_context.filter_len > data_array.len() {
+        return Err(ArrowError::ComputeError(
+            "Filter array cannot be larger than data array".to_string(),
+        ));
+    }
+    let filtered_count = filter_context.filtered_count;
+    let filter_mask = &filter_context.filter_mask;
+    let filter_u64 = &filter_context.filter_u64;
+    let data_start = data_array.data_ref().buffers()[0].raw_data();
+    let mut value_buffer = MutableBuffer::new(filtered_count * value_size);
+    value_buffer.resize(filtered_count * value_size)?;
+    let mut value_position = value_buffer.raw_data_mut();
+    let mut null_bit_setter = get_null_bit_setter(data_array);
+    let null_bit_setter = null_bit_setter.as_mut();
+
+    for (i, filter_batch) in filter_u64.iter().enumerate() {
+        // foreach u64 batch
+        let filter_batch = *filter_batch;
+        if filter_batch == 0 {
+            // if batch == 0: skip
+            continue;
         }
-        DataType::Timestamp(TimeUnit::Microsecond, _) => {
-            filter_array!(array, filter, TimestampMicrosecondArray)
+        for (j, filter_mask) in filter_mask.iter().enumerate() {
+            // foreach bit in batch:
+            if (filter_batch & *filter_mask) != 0 {
+                let data_index = (i * 64) + j;
+                null_bit_setter.copy_null_bit(data_index);
+                // if filter bit == 1: copy data value to temp array
+                unsafe {
+                    // this should be safe because of the data_array.len() check at the beginning of the method
+                    memory::memcpy(
+                        value_position,
+                        data_start.add(value_size * data_index),
+                        value_size,
+                    );
+                    value_position = value_position.add(value_size);
+                }
+            }
         }
-        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-            filter_array!(array, filter, TimestampNanosecondArray)
+    }
+
+    let mut array_data_builder = ArrayDataBuilder::new(array_type)
+        .len(filtered_count)
+        .add_buffer(value_buffer.freeze());
+    if null_bit_setter.null_count() > 0 {
+        array_data_builder = array_data_builder
+            .null_count(null_bit_setter.null_count())
+            .null_bit_buffer(null_bit_setter.null_buffer());
+    }
+
+    Ok(array_data_builder)
+}
+
+/// FilterContext can be used to improve performance when
+/// filtering multiple data arrays with the same filter array.
+#[derive(Debug)]
+pub struct FilterContext {
+    filter_u64: Vec<u64>,
+    filter_len: usize,
+    filtered_count: usize,
+    filter_mask: Vec<u64>,
+}
+
+macro_rules! filter_primitive_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_primitive_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+macro_rules! filter_dictionary_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_dictionary_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+impl FilterContext {
+    /// Returns a new instance of FilterContext
+    pub fn new(filter_array: &BooleanArray) -> Self {
+        let filter_mask: Vec<u64> = (0..64).map(|x| 1u64 << x).collect();
+        let filter_bytes = filter_array.data_ref().buffers()[0].data();
+        let filtered_count = bit_util::count_set_bits(filter_bytes);
+        // transmute filter_bytes to &[u64]
+        let mut u64_buffer = MutableBuffer::new(filter_bytes.len());
+        u64_buffer
+            .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len())
+            .unwrap();
+        let filter_u64 = u64_buffer.typed_data_mut::<u64>().to_owned();
+        FilterContext {
+            filter_u64,
+            filter_len: filter_array.len(),
+            filtered_count,
+            filter_mask,
         }
-        DataType::Binary => {
-            let b = array.as_any().downcast_ref::<BinaryArray>().unwrap();
-            let mut values: Vec<&[u8]> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+    }
+
+    /// Returns a new array, containing only the elements matching the filter
+    pub fn filter(&self, array: &Array) -> Result<ArrayRef> {
+        match array.data_type() {
+            DataType::UInt8 => filter_primitive_array!(self, array, UInt8Array),
+            DataType::UInt16 => filter_primitive_array!(self, array, UInt16Array),
+            DataType::UInt32 => filter_primitive_array!(self, array, UInt32Array),
+            DataType::UInt64 => filter_primitive_array!(self, array, UInt64Array),
+            DataType::Int8 => filter_primitive_array!(self, array, Int8Array),
+            DataType::Int16 => filter_primitive_array!(self, array, Int16Array),
+            DataType::Int32 => filter_primitive_array!(self, array, Int32Array),
+            DataType::Int64 => filter_primitive_array!(self, array, Int64Array),
+            DataType::Float32 => filter_primitive_array!(self, array, Float32Array),
+            DataType::Float64 => filter_primitive_array!(self, array, Float64Array),
+            DataType::Boolean => {
+                let input_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+                let mut builder = BooleanArray::builder(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            if input_array.is_null(data_index) {
+                                builder.append_null()?;
+                            } else {
+                                builder.append_value(input_array.value(data_index))?;
+                            }
+                        }
+                    }
                 }
+                Ok(Arc::new(builder.finish()))
+            },
+            DataType::Date32(_) => filter_primitive_array!(self, array, Date32Array),
+            DataType::Date64(_) => filter_primitive_array!(self, array, Date64Array),
+            DataType::Time32(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, Time32SecondArray)
             }
-            Ok(Arc::new(BinaryArray::from(values)))
-        }
-        DataType::Utf8 => {
-            let b = array.as_any().downcast_ref::<StringArray>().unwrap();
-            let mut values: Vec<&str> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+            DataType::Time32(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, Time32MillisecondArray)
+            }
+            DataType::Time64(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, Time64MicrosecondArray)
+            }
+            DataType::Time64(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, Time64NanosecondArray)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, DurationSecondArray)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, DurationMillisecondArray)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, DurationMicrosecondArray)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, DurationNanosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Second, _) => {
+                filter_primitive_array!(self, array, TimestampSecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                filter_primitive_array!(self, array, TimestampMillisecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                filter_primitive_array!(self, array, TimestampMicrosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                filter_primitive_array!(self, array, TimestampNanosecondArray)
+            }
+            DataType::Binary => {
+                let input_array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+                let mut values: Vec<&[u8]> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
                 }
+                Ok(Arc::new(BinaryArray::from(values)))
             }
-            Ok(Arc::new(StringArray::from(values)))
+            DataType::Utf8 => {
+                let input_array = array.as_any().downcast_ref::<StringArray>().unwrap();
+                let mut values: Vec<&str> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
+                }
+                Ok(Arc::new(StringArray::from(values)))
+            }
+            DataType::Dictionary(ref key_type, ref value_type) => match (key_type.as_ref(), value_type.as_ref()) {
+                (key_type, DataType::Utf8) => match key_type {
+                    DataType::UInt8 => filter_dictionary_array!(self, array, UInt8DictionaryArray),
+                    DataType::UInt16 => filter_dictionary_array!(self, array, UInt16DictionaryArray),
+                    DataType::UInt32 => filter_dictionary_array!(self, array, UInt32DictionaryArray),
+                    DataType::UInt64 => filter_dictionary_array!(self, array, UInt64DictionaryArray),
+                    DataType::Int8 => filter_dictionary_array!(self, array, Int8DictionaryArray),
+                    DataType::Int16 => filter_dictionary_array!(self, array, Int16DictionaryArray),
+                    DataType::Int32 => filter_dictionary_array!(self, array, Int32DictionaryArray),
+                    DataType::Int64 => filter_dictionary_array!(self, array, Int64DictionaryArray),
+                    other => Err(ArrowError::ComputeError(format!(
+                        "filter not supported for string dictionary with key of type {:?}",
+                        other
+                    )))
+                }
+                (key_type, value_type) => Err(ArrowError::ComputeError(format!(
+                    "filter not supported for Dictionary({:?}, {:?})",
+                    key_type, value_type
+                )))
+            }
+            other => Err(ArrowError::ComputeError(format!(
+                "filter not supported for {:?}",
+                other
+            ))),
         }
-        other => Err(ArrowError::ComputeError(format!(
-            "filter not supported for {:?}",
-            other
-        ))),
     }
+
+    /// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter
+    pub fn filter_primitive_array<T>(
+        &self,
+        data_array: &PrimitiveArray<T>,
+    ) -> Result<PrimitiveArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = T::get_data_type();
+        let value_size = mem::size_of::<T::Native>();
+        let array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        let data = array_data_builder.build();
+        Ok(PrimitiveArray::<T>::from(data))
+    }
+
+    /// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+    pub fn filter_dictionary_array<T>(
+        &self,
+        data_array: &DictionaryArray<T>,
+    ) -> Result<DictionaryArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = data_array.data_type().clone();
+        let value_size = mem::size_of::<T::Native>();
+        let mut array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        // copy dictionary values from input array
+        array_data_builder =
+            array_data_builder.add_child_data(data_array.values().data());
+        let data = array_data_builder.build();
+        Ok(DictionaryArray::<T>::from(data))
+    }
+}
+
+/// Returns a new array, containing only the elements matching the filter.
+pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
+    FilterContext::new(filter).filter(array)
+}
+
+/// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter
+pub fn filter_primitive_array<T>(
+    data_array: &PrimitiveArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<PrimitiveArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_primitive_array(data_array)
+}
+
+/// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+pub fn filter_dictionary_array<T>(
+    data_array: &DictionaryArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<DictionaryArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_dictionary_array(data_array)
+}
+
+/// Returns a new RecordBatch with arrays containing only values matching the filter.
+/// The same FilterContext is re-used when filtering arrays in the RecordBatch for better performance.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter_array: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter_context = FilterContext::new(filter_array);
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_context.filter(a.as_ref()).unwrap())

Review comment:
       Isn't this `.unwrap()`"hiding" errors related with unsupported data types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org