You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/07/19 11:33:38 UTC

[GitHub] [arrow] yordan-pavlov opened a new pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

yordan-pavlov opened a new pull request #7798:
URL: https://github.com/apache/arrow/pull/7798


   The filter kernel located here https://github.com/apache/arrow/blob/master/rust/arrow/src/compute/kernels/filter.rs
   
   currently has the following performance:
   
   filter old u8 low selectivity time: [1.7782 ms 1.7790 ms 1.7801 ms]
   filter old u8 high selectivity time: [815.58 us 816.58 us 817.57 us]
   filter old u8 w NULLs low selectivity time: [1.8131 ms 1.8231 ms 1.8336 ms]
   filter old u8 w NULLs high selectivity time: [817.41 us 820.01 us 823.05 us]
   
   I have been working on a new implementation which performs between approximately 14 and 480 times faster depending mostly on filter selectivity. Here are the benchmark results:
   
   filter u8 low selectivity time: [127.30 us 128.06 us 128.88 us]
   filter u8 high selectivity time: [5.4215 us 5.5778 us 5.7335 us]
   filter context u8 low selectivity time: [124.21 us 126.21 us 128.38 us]
   filter context u8 high selectivity time: [1.6707 us 1.7052 us 1.7476 us]
   filter context u8 w NULLs low selectivity time: [142.40 us 142.83 us 143.37 us]
   filter context u8 w NULLs high selectivity time: [2.3338 us 2.3788 us 2.4304 us]
   filter context f32 low selectivity time: [132.59 us 132.91 us 133.29 us]
   filter context f32 high selectivity time: [1.6864 us 1.7026 us 1.7212 us]
   
   This new implementation is based on a few key ideas:
   
   (1) if the data array being filtered doesn't have a null bitmap, no time should be wasted to copy or create a null bitmap in the resulting filtered data array - this is implemented using the CopyNullBit trait which has a no-op implementation and an actual implementation
   
   (2) when the filter is highly selective, e.g. only a small number of values from the data array are selected, the filter implementation should efficiently skip entire batches of 0s in the filter array - this is implemented by transmuting the filter array to u64 which allows to quickly check and skip entire batches of 64 bits 
   
   (3) when an entire record batch is filtered, any computation which only depends on the filter array is done once and then shared for filtering all the data arrays in the record batch - this is implemented using the FilterContext struct
   
   This pull request also implements support for filtering dictionary arrays. 
   
   @paddyhoran @andygrove 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-670725220


   I completely agree. I just happen to have a protobuf definition for plans and expressions as well as serde code for Java and Rust that I would be happy to contribute. We would need to implement for C++.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov edited a comment on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov edited a comment on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663888189


   @jorgecarleitao thanks for the feedback
   
   I did some more profiling specifically around the unsafe parts of the code and found that the safe version of `copy_null_bit` is just as fast so have removed that unsafe section; here are some benchmark results:
   
   copy_null_bit unsafe:
   filter context u8 w NULLs low selectivity  time:   [142.05 us 142.35 us 142.68 us]
   filter context u8 w NULLs high selectivity time:   [2.0915 us 2.1015 us 2.1127 us]
   
   copy_null_bit safe:
   filter context u8 w NULLs low selectivity  time:   [134.74 us 134.86 us 134.98 us]
   filter context u8 w NULLs high selectivity time:   [2.0536 us 2.0613 us 2.0707 us]
   
   I also benchmarked replacing the unsafe section in the `filter_array_impl` method with `value_buffer.write()` but this results in approximately 18% drop in performance with sparse filter arrays as can be seen from the benchmark results below:
   
   filter u8 low selectivity
                           time:   [131.08 us 132.46 us 134.27 us]
                           change: [+13.141% +17.189% +22.115%] (p = 0.00 < 0.05)
   
   filter context u8 low selectivity
                           time:   [127.47 us 129.27 us 131.56 us]
                           change: [+12.008% +19.674% +27.939%] (p = 0.00 < 0.05)
   
   filter context u8 w NULLs low selectivity
                           time:   [154.32 us 155.27 us 156.79 us]
                           change: [+15.444% +17.846% +22.268%] (p = 0.00 < 0.05)
   
   filter context f32 low selectivity
                           time:   [137.62 us 138.01 us 138.52 us]
                           change: [+12.495% +18.180% +23.088%] (p = 0.00 < 0.05)
   
   finally, looking at the C++ implementation inspired me to change the `filter_array_impl` method to add a special case where the 64bit filter batch is all 1s and this doesn't appear to reduce performance in other cases but improves performance of filtering with very dense filter arrays (almost all 1s) by about 20 times; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [109.75 us 110.30 us 111.02 us]
   filter u8 high selectivity                     time:   [4.8372 us 4.8433 us 4.8502 us]
   filter u8 very low selectivity                 time:   [11.782 us 11.798 us 11.816 us]
   
   filter context u8 low selectivity              time:   [109.07 us 109.65 us 110.66 us]
   filter context u8 high selectivity             time:   [1.4704 us 1.4762 us 1.4842 us]
   filter context u8 very low selectivity         time:   [8.8455 us 9.0530 us 9.3171 us]
   
   filter context u8 w NULLs low selectivity      time:   [142.65 us 143.53 us 144.81 us]
   filter context u8 w NULLs high selectivity     time:   [2.2111 us 2.2297 us 2.2514 us]
   filter context u8 w NULLs very low selectivity time:   [161.89 us 167.90 us 175.02 us]
   
   filter context f32 low selectivity             time:   [138.71 us 139.83 us 141.41 us]
   filter context f32 high selectivity            time:   [1.6111 us 1.6342 us 1.6605 us]
   filter context f32 very low selectivity        time:   [19.719 us 19.865 us 20.045 us]
   
   @andygrove  any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
wesm commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-660678597


   I refer you all to the work that we've recently done in C++ where we use popcount on 64 bits at a time to efficiently compute the size of the filter output as well as quickly compute the filtered output. It might be worth replicating the "BitBlockCounter" concept in Rust


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-667344474


   @paddyhoran yes, you are right, I added a couple more tests for sliced arrays and they didn't pass so seeing that the PR was not yet merged I added a few small changes to 
   (1) implement support for filtering of sliced / offset data arrays
   (2) return an error if the filter array is offset - I thought it better to make this obvious rather than hide it
   
   I looked briefly into implementing support for offset filter arrays but I couldn't figure out how to do it quickly - I might have to have another look at the C++ implementation and submit a separate PR for that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663933810


   Sorry @yordan-pavlov but I didn't get a chance to review yet. I hope to get to it soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
andygrove commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-661255507


   @yordan-pavlov This is exciting! I will start reviewing this later today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] jorgecarleitao commented on a change in pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
jorgecarleitao commented on a change in pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#discussion_r460365103



##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,139 +17,443 @@
 
 //! Defines miscellaneous array kernels.
 
-use std::sync::Arc;
-
 use crate::array::*;
 use crate::datatypes::{ArrowNumericType, DataType, TimeUnit};
 use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+use crate::{
+    bitmap::Bitmap,
+    buffer::{Buffer, MutableBuffer},
+    memory,
+    util::bit_util,
+};
+use std::{mem, sync::Arc};
 
-/// Helper function to perform boolean lambda function on values from two arrays.
-fn bool_op<T, F>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-    op: F,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-    F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform math operation on arrays of different length".to_string(),
-        ));
+/// trait for copying filtered null bitmap bits
+trait CopyNullBit {
+    fn copy_null_bit(&mut self, source_index: usize);
+    fn null_count(&self) -> usize;
+    fn null_buffer(&mut self) -> Buffer;
+}
+
+/// no-op null bitmap copy implementation,
+/// used when the filtered data array doesn't have a null bitmap
+struct NullBitNoop {}
+
+impl NullBitNoop {
+    fn new() -> Self {
+        NullBitNoop {}
     }
-    let mut b = BooleanArray::builder(left.len());
-    for i in 0..left.len() {
-        let index = i;
-        let l = if left.is_null(i) {
-            None
-        } else {
-            Some(left.value(index))
-        };
-        let r = if right.is_null(i) {
-            None
-        } else {
-            Some(right.value(index))
-        };
-        b.append_value(op(l, r))?;
+}
+
+impl CopyNullBit for NullBitNoop {
+    #[inline]
+    fn copy_null_bit(&mut self, _source_index: usize) {
+        // do nothing
+    }
+
+    fn null_count(&self) -> usize {
+        0
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        Buffer::from([0u8; 0])
     }
-    Ok(b.finish())
 }
 
-macro_rules! filter_array {
-    ($array:expr, $filter:expr, $array_type:ident) => {{
-        let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
-        let mut builder = $array_type::builder(b.len());
-        for i in 0..b.len() {
-            if $filter.value(i) {
-                if b.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(b.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
+/// null bitmap copy implementation,
+/// used when the filtered data array has a null bitmap
+struct NullBitSetter<'a> {
+    target_buffer: MutableBuffer,
+    source_bytes: &'a [u8],
+    target_index: usize,
+    null_count: usize,
 }
 
-/// Returns the array, taking only the elements matching the filter
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
-    match array.data_type() {
-        DataType::UInt8 => filter_array!(array, filter, UInt8Array),
-        DataType::UInt16 => filter_array!(array, filter, UInt16Array),
-        DataType::UInt32 => filter_array!(array, filter, UInt32Array),
-        DataType::UInt64 => filter_array!(array, filter, UInt64Array),
-        DataType::Int8 => filter_array!(array, filter, Int8Array),
-        DataType::Int16 => filter_array!(array, filter, Int16Array),
-        DataType::Int32 => filter_array!(array, filter, Int32Array),
-        DataType::Int64 => filter_array!(array, filter, Int64Array),
-        DataType::Float32 => filter_array!(array, filter, Float32Array),
-        DataType::Float64 => filter_array!(array, filter, Float64Array),
-        DataType::Boolean => filter_array!(array, filter, BooleanArray),
-        DataType::Date32(_) => filter_array!(array, filter, Date32Array),
-        DataType::Date64(_) => filter_array!(array, filter, Date64Array),
-        DataType::Time32(TimeUnit::Second) => {
-            filter_array!(array, filter, Time32SecondArray)
-        }
-        DataType::Time32(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, Time32MillisecondArray)
-        }
-        DataType::Time64(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, Time64MicrosecondArray)
-        }
-        DataType::Time64(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, Time64NanosecondArray)
-        }
-        DataType::Duration(TimeUnit::Second) => {
-            filter_array!(array, filter, DurationSecondArray)
-        }
-        DataType::Duration(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, DurationMillisecondArray)
+impl<'a> NullBitSetter<'a> {
+    fn new(null_bitmap: &'a Bitmap) -> Self {
+        let null_bytes = null_bitmap.buffer_ref().data();
+        // create null bitmap buffer with same length and initialize null bitmap buffer to 1s
+        let null_buffer =
+            MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true);
+        NullBitSetter {
+            source_bytes: null_bytes,
+            target_buffer: null_buffer,
+            target_index: 0,
+            null_count: 0,
         }
-        DataType::Duration(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, DurationMicrosecondArray)
-        }
-        DataType::Duration(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, DurationNanosecondArray)
-        }
-        DataType::Timestamp(TimeUnit::Second, _) => {
-            filter_array!(array, filter, TimestampSecondArray)
+    }
+}
+
+impl<'a> CopyNullBit for NullBitSetter<'a> {
+    #[inline]
+    fn copy_null_bit(&mut self, source_index: usize) {
+        if !bit_util::get_bit(self.source_bytes, source_index) {
+            // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len()
+            unsafe {
+                bit_util::unset_bit_raw(
+                    self.target_buffer.raw_data_mut(),
+                    self.target_index,
+                );
+            }
+            self.null_count += 1;
         }
-        DataType::Timestamp(TimeUnit::Millisecond, _) => {
-            filter_array!(array, filter, TimestampMillisecondArray)
+        self.target_index += 1;
+    }
+
+    fn null_count(&self) -> usize {
+        self.null_count
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        self.target_buffer.resize(self.target_index).unwrap();
+        // use mem::replace to detach self.target_buffer from self so that it can be returned
+        let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0));
+        target_buffer.freeze()
+    }
+}
+
+fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit + 'a> {
+    if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
+        // only return an actual null bit copy implementation if null_bitmap is set
+        Box::new(NullBitSetter::new(null_bitmap))
+    } else {
+        // otherwise return a no-op copy null bit implementation
+        // for improved performance when the filtered array doesn't contain NULLs
+        Box::new(NullBitNoop::new())
+    }
+}
+
+// transmute filter array to u64
+// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits
+// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap
+fn filter_array_impl(
+    filter_context: &FilterContext,
+    data_array: &impl Array,
+    array_type: DataType,
+    value_size: usize,
+) -> Result<ArrayDataBuilder> {
+    if filter_context.filter_len > data_array.len() {
+        return Err(ArrowError::ComputeError(
+            "Filter array cannot be larger than data array".to_string(),
+        ));
+    }
+    let filtered_count = filter_context.filtered_count;
+    let filter_mask = &filter_context.filter_mask;
+    let filter_u64 = &filter_context.filter_u64;
+    let data_start = data_array.data_ref().buffers()[0].raw_data();
+    let mut value_buffer = MutableBuffer::new(filtered_count * value_size);
+    value_buffer.resize(filtered_count * value_size)?;
+    let mut value_position = value_buffer.raw_data_mut();
+    let mut null_bit_setter = get_null_bit_setter(data_array);
+    let null_bit_setter = null_bit_setter.as_mut();
+
+    for (i, filter_batch) in filter_u64.iter().enumerate() {
+        // foreach u64 batch
+        let filter_batch = *filter_batch;
+        if filter_batch == 0 {
+            // if batch == 0: skip
+            continue;
         }
-        DataType::Timestamp(TimeUnit::Microsecond, _) => {
-            filter_array!(array, filter, TimestampMicrosecondArray)
+        for (j, filter_mask) in filter_mask.iter().enumerate() {
+            // foreach bit in batch:
+            if (filter_batch & *filter_mask) != 0 {
+                let data_index = (i * 64) + j;
+                null_bit_setter.copy_null_bit(data_index);
+                // if filter bit == 1: copy data value to temp array
+                unsafe {
+                    // this should be safe because of the data_array.len() check at the beginning of the method
+                    memory::memcpy(
+                        value_position,
+                        data_start.add(value_size * data_index),
+                        value_size,
+                    );
+                    value_position = value_position.add(value_size);
+                }
+            }
         }
-        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-            filter_array!(array, filter, TimestampNanosecondArray)
+    }
+
+    let mut array_data_builder = ArrayDataBuilder::new(array_type)
+        .len(filtered_count)
+        .add_buffer(value_buffer.freeze());
+    if null_bit_setter.null_count() > 0 {
+        array_data_builder = array_data_builder
+            .null_count(null_bit_setter.null_count())
+            .null_bit_buffer(null_bit_setter.null_buffer());
+    }
+
+    Ok(array_data_builder)
+}
+
+/// FilterContext can be used to improve performance when
+/// filtering multiple data arrays with the same filter array.
+#[derive(Debug)]
+pub struct FilterContext {
+    filter_u64: Vec<u64>,
+    filter_len: usize,
+    filtered_count: usize,
+    filter_mask: Vec<u64>,
+}
+
+macro_rules! filter_primitive_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_primitive_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+macro_rules! filter_dictionary_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_dictionary_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+impl FilterContext {
+    /// Returns a new instance of FilterContext
+    pub fn new(filter_array: &BooleanArray) -> Self {
+        let filter_mask: Vec<u64> = (0..64).map(|x| 1u64 << x).collect();
+        let filter_bytes = filter_array.data_ref().buffers()[0].data();
+        let filtered_count = bit_util::count_set_bits(filter_bytes);
+        // transmute filter_bytes to &[u64]
+        let mut u64_buffer = MutableBuffer::new(filter_bytes.len());
+        u64_buffer
+            .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len())
+            .unwrap();
+        let filter_u64 = u64_buffer.typed_data_mut::<u64>().to_owned();
+        FilterContext {
+            filter_u64,
+            filter_len: filter_array.len(),
+            filtered_count,
+            filter_mask,
         }
-        DataType::Binary => {
-            let b = array.as_any().downcast_ref::<BinaryArray>().unwrap();
-            let mut values: Vec<&[u8]> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+    }
+
+    /// Returns a new array, containing only the elements matching the filter
+    pub fn filter(&self, array: &Array) -> Result<ArrayRef> {
+        match array.data_type() {
+            DataType::UInt8 => filter_primitive_array!(self, array, UInt8Array),
+            DataType::UInt16 => filter_primitive_array!(self, array, UInt16Array),
+            DataType::UInt32 => filter_primitive_array!(self, array, UInt32Array),
+            DataType::UInt64 => filter_primitive_array!(self, array, UInt64Array),
+            DataType::Int8 => filter_primitive_array!(self, array, Int8Array),
+            DataType::Int16 => filter_primitive_array!(self, array, Int16Array),
+            DataType::Int32 => filter_primitive_array!(self, array, Int32Array),
+            DataType::Int64 => filter_primitive_array!(self, array, Int64Array),
+            DataType::Float32 => filter_primitive_array!(self, array, Float32Array),
+            DataType::Float64 => filter_primitive_array!(self, array, Float64Array),
+            DataType::Boolean => {
+                let input_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+                let mut builder = BooleanArray::builder(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            if input_array.is_null(data_index) {
+                                builder.append_null()?;
+                            } else {
+                                builder.append_value(input_array.value(data_index))?;
+                            }
+                        }
+                    }
                 }
+                Ok(Arc::new(builder.finish()))
+            },
+            DataType::Date32(_) => filter_primitive_array!(self, array, Date32Array),
+            DataType::Date64(_) => filter_primitive_array!(self, array, Date64Array),
+            DataType::Time32(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, Time32SecondArray)
             }
-            Ok(Arc::new(BinaryArray::from(values)))
-        }
-        DataType::Utf8 => {
-            let b = array.as_any().downcast_ref::<StringArray>().unwrap();
-            let mut values: Vec<&str> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+            DataType::Time32(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, Time32MillisecondArray)
+            }
+            DataType::Time64(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, Time64MicrosecondArray)
+            }
+            DataType::Time64(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, Time64NanosecondArray)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, DurationSecondArray)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, DurationMillisecondArray)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, DurationMicrosecondArray)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, DurationNanosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Second, _) => {
+                filter_primitive_array!(self, array, TimestampSecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                filter_primitive_array!(self, array, TimestampMillisecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                filter_primitive_array!(self, array, TimestampMicrosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                filter_primitive_array!(self, array, TimestampNanosecondArray)
+            }
+            DataType::Binary => {
+                let input_array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+                let mut values: Vec<&[u8]> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
                 }
+                Ok(Arc::new(BinaryArray::from(values)))
             }
-            Ok(Arc::new(StringArray::from(values)))
+            DataType::Utf8 => {
+                let input_array = array.as_any().downcast_ref::<StringArray>().unwrap();
+                let mut values: Vec<&str> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
+                }
+                Ok(Arc::new(StringArray::from(values)))
+            }
+            DataType::Dictionary(ref key_type, ref value_type) => match (key_type.as_ref(), value_type.as_ref()) {
+                (key_type, DataType::Utf8) => match key_type {
+                    DataType::UInt8 => filter_dictionary_array!(self, array, UInt8DictionaryArray),
+                    DataType::UInt16 => filter_dictionary_array!(self, array, UInt16DictionaryArray),
+                    DataType::UInt32 => filter_dictionary_array!(self, array, UInt32DictionaryArray),
+                    DataType::UInt64 => filter_dictionary_array!(self, array, UInt64DictionaryArray),
+                    DataType::Int8 => filter_dictionary_array!(self, array, Int8DictionaryArray),
+                    DataType::Int16 => filter_dictionary_array!(self, array, Int16DictionaryArray),
+                    DataType::Int32 => filter_dictionary_array!(self, array, Int32DictionaryArray),
+                    DataType::Int64 => filter_dictionary_array!(self, array, Int64DictionaryArray),
+                    other => Err(ArrowError::ComputeError(format!(
+                        "filter not supported for string dictionary with key of type {:?}",
+                        other
+                    )))
+                }
+                (key_type, value_type) => Err(ArrowError::ComputeError(format!(
+                    "filter not supported for Dictionary({:?}, {:?})",
+                    key_type, value_type
+                )))
+            }
+            other => Err(ArrowError::ComputeError(format!(
+                "filter not supported for {:?}",
+                other
+            ))),
         }
-        other => Err(ArrowError::ComputeError(format!(
-            "filter not supported for {:?}",
-            other
-        ))),
     }
+
+    /// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter
+    pub fn filter_primitive_array<T>(
+        &self,
+        data_array: &PrimitiveArray<T>,
+    ) -> Result<PrimitiveArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = T::get_data_type();
+        let value_size = mem::size_of::<T::Native>();
+        let array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        let data = array_data_builder.build();
+        Ok(PrimitiveArray::<T>::from(data))
+    }
+
+    /// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+    pub fn filter_dictionary_array<T>(
+        &self,
+        data_array: &DictionaryArray<T>,
+    ) -> Result<DictionaryArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = data_array.data_type().clone();
+        let value_size = mem::size_of::<T::Native>();
+        let mut array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        // copy dictionary values from input array
+        array_data_builder =
+            array_data_builder.add_child_data(data_array.values().data());
+        let data = array_data_builder.build();
+        Ok(DictionaryArray::<T>::from(data))
+    }
+}
+
+/// Returns a new array, containing only the elements matching the filter.
+pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
+    FilterContext::new(filter).filter(array)
+}
+
+/// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter
+pub fn filter_primitive_array<T>(
+    data_array: &PrimitiveArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<PrimitiveArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_primitive_array(data_array)
+}
+
+/// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+pub fn filter_dictionary_array<T>(
+    data_array: &DictionaryArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<DictionaryArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_dictionary_array(data_array)
+}
+
+/// Returns a new RecordBatch with arrays containing only values matching the filter.
+/// The same FilterContext is re-used when filtering arrays in the RecordBatch for better performance.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter_array: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter_context = FilterContext::new(filter_array);
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_context.filter(a.as_ref()).unwrap())

Review comment:
       Isn't this `.unwrap()`"hiding" errors related with unsupported data types?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove closed pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
andygrove closed pull request #7798:
URL: https://github.com/apache/arrow/pull/7798


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663888189


   @jorgecarleitao thanks for the feedback
   
   I did some more profiling specifically around the unsafe parts of code and found that the safe version of `copy_null_bit` is just as fast so have removed that unsafe section; here are some benchmark results:
   
   copy_null_bit unsafe:
   filter context u8 w NULLs low selectivity  time:   [142.05 us 142.35 us 142.68 us]
   filter context u8 w NULLs high selectivity time:   [2.0915 us 2.1015 us 2.1127 us]
   
   copy_null_bit safe:
   filter context u8 w NULLs low selectivity  time:   [134.74 us 134.86 us 134.98 us]
   filter context u8 w NULLs high selectivity time:   [2.0536 us 2.0613 us 2.0707 us]
   
   I also benchmarked replacing the unsafe section in the `filter_array_impl` method with `value_buffer.write()` but this results in approximately 17% drop in performance with sparse filter arrays as can be seen from the benchmark results below:
   
   filter u8 low selectivity
                           time:   [131.08 us 132.46 us 134.27 us]
                           change: [+13.141% +17.189% +22.115%] (p = 0.00 < 0.05)
   
   filter context u8 low selectivity
                           time:   [127.47 us 129.27 us 131.56 us]
                           change: [+12.008% +19.674% +27.939%] (p = 0.00 < 0.05)
   
   filter context u8 w NULLs low selectivity
                           time:   [154.32 us 155.27 us 156.79 us]
                           change: [+15.444% +17.846% +22.268%] (p = 0.00 < 0.05)
   
   filter context f32 low selectivity
                           time:   [137.62 us 138.01 us 138.52 us]
                           change: [+12.495% +18.180% +23.088%] (p = 0.00 < 0.05)
   
   finally, looking at the C++ implementation inspired me to change the `filter_array_impl` method to add a special case where the 64bit filter batch is all 1s and this doesn't appear to reduce performance in other cases but improves performance of filtering with very dense filter arrays (almost all 1s) by about 20 times; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [109.75 us 110.30 us 111.02 us]
   filter u8 high selectivity                     time:   [4.8372 us 4.8433 us 4.8502 us]
   filter u8 very low selectivity                 time:   [11.782 us 11.798 us 11.816 us]
   filter context u8 low selectivity              time:   [109.07 us 109.65 us 110.66 us]
   filter context u8 high selectivity             time:   [1.4704 us 1.4762 us 1.4842 us]
   filter context u8 very low selectivity         time:   [8.8455 us 9.0530 us 9.3171 us]
   filter context u8 w NULLs low selectivity      time:   [135.32 us 135.49 us 135.66 us]
   filter context u8 w NULLs high selectivity     time:   [2.0579 us 2.0680 us 2.0796 us]
   filter context u8 w NULLs very low selectivity time:   [11.583 us 11.668 us 11.780 us]
   filter context f32 low selectivity             time:   [138.71 us 139.83 us 141.41 us]
   filter context f32 high selectivity            time:   [1.6111 us 1.6342 us 1.6605 us]
   filter context f32 very low selectivity        time:   [19.719 us 19.865 us 20.045 us]
   
   @andygrove  any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] wesm commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
wesm commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-670723322


   This PR makes me think that at some point (when someone gets really motivated), it would be interesting to implement a non-language-dependent benchmark harness for certain operations (such that can be represented using e.g. a standard protobuf serialized operation/expression) so that we can get apples-to-apples numbers for certain operations across implementations. It would be interesting to know e.g. how the Rust implementation of filtering compares with the C++


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] github-actions[bot] commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-660631547


   https://issues.apache.org/jira/browse/ARROW-9523


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov edited a comment on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov edited a comment on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-667344474


   @paddyhoran yes, you are right, I added a couple more tests for sliced arrays and they didn't pass so seeing that the PR was not yet merged I added a few small changes to 
   (1) implement support for filtering of sliced / offset data arrays
   (2) return an error if the filter array is offset - I thought it better to make this obvious rather than silently process the offset filter array incorrectly
   
   I looked briefly into implementing support for offset filter arrays but I couldn't figure out how to do it quickly - I might have to take another look at the C++ implementation and submit a separate PR for that.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov commented on a change in pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#discussion_r460432008



##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,139 +17,443 @@
 
 //! Defines miscellaneous array kernels.
 
-use std::sync::Arc;
-
 use crate::array::*;
 use crate::datatypes::{ArrowNumericType, DataType, TimeUnit};
 use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+use crate::{
+    bitmap::Bitmap,
+    buffer::{Buffer, MutableBuffer},
+    memory,
+    util::bit_util,
+};
+use std::{mem, sync::Arc};
 
-/// Helper function to perform boolean lambda function on values from two arrays.
-fn bool_op<T, F>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-    op: F,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-    F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform math operation on arrays of different length".to_string(),
-        ));
+/// trait for copying filtered null bitmap bits
+trait CopyNullBit {
+    fn copy_null_bit(&mut self, source_index: usize);
+    fn null_count(&self) -> usize;
+    fn null_buffer(&mut self) -> Buffer;
+}
+
+/// no-op null bitmap copy implementation,
+/// used when the filtered data array doesn't have a null bitmap
+struct NullBitNoop {}
+
+impl NullBitNoop {
+    fn new() -> Self {
+        NullBitNoop {}
     }
-    let mut b = BooleanArray::builder(left.len());
-    for i in 0..left.len() {
-        let index = i;
-        let l = if left.is_null(i) {
-            None
-        } else {
-            Some(left.value(index))
-        };
-        let r = if right.is_null(i) {
-            None
-        } else {
-            Some(right.value(index))
-        };
-        b.append_value(op(l, r))?;
+}
+
+impl CopyNullBit for NullBitNoop {
+    #[inline]
+    fn copy_null_bit(&mut self, _source_index: usize) {
+        // do nothing
+    }
+
+    fn null_count(&self) -> usize {
+        0
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        Buffer::from([0u8; 0])
     }
-    Ok(b.finish())
 }
 
-macro_rules! filter_array {
-    ($array:expr, $filter:expr, $array_type:ident) => {{
-        let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
-        let mut builder = $array_type::builder(b.len());
-        for i in 0..b.len() {
-            if $filter.value(i) {
-                if b.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(b.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
+/// null bitmap copy implementation,
+/// used when the filtered data array has a null bitmap
+struct NullBitSetter<'a> {
+    target_buffer: MutableBuffer,
+    source_bytes: &'a [u8],
+    target_index: usize,
+    null_count: usize,
 }
 
-/// Returns the array, taking only the elements matching the filter
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
-    match array.data_type() {
-        DataType::UInt8 => filter_array!(array, filter, UInt8Array),
-        DataType::UInt16 => filter_array!(array, filter, UInt16Array),
-        DataType::UInt32 => filter_array!(array, filter, UInt32Array),
-        DataType::UInt64 => filter_array!(array, filter, UInt64Array),
-        DataType::Int8 => filter_array!(array, filter, Int8Array),
-        DataType::Int16 => filter_array!(array, filter, Int16Array),
-        DataType::Int32 => filter_array!(array, filter, Int32Array),
-        DataType::Int64 => filter_array!(array, filter, Int64Array),
-        DataType::Float32 => filter_array!(array, filter, Float32Array),
-        DataType::Float64 => filter_array!(array, filter, Float64Array),
-        DataType::Boolean => filter_array!(array, filter, BooleanArray),
-        DataType::Date32(_) => filter_array!(array, filter, Date32Array),
-        DataType::Date64(_) => filter_array!(array, filter, Date64Array),
-        DataType::Time32(TimeUnit::Second) => {
-            filter_array!(array, filter, Time32SecondArray)
-        }
-        DataType::Time32(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, Time32MillisecondArray)
-        }
-        DataType::Time64(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, Time64MicrosecondArray)
-        }
-        DataType::Time64(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, Time64NanosecondArray)
-        }
-        DataType::Duration(TimeUnit::Second) => {
-            filter_array!(array, filter, DurationSecondArray)
-        }
-        DataType::Duration(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, DurationMillisecondArray)
+impl<'a> NullBitSetter<'a> {
+    fn new(null_bitmap: &'a Bitmap) -> Self {
+        let null_bytes = null_bitmap.buffer_ref().data();
+        // create null bitmap buffer with same length and initialize null bitmap buffer to 1s
+        let null_buffer =
+            MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true);
+        NullBitSetter {
+            source_bytes: null_bytes,
+            target_buffer: null_buffer,
+            target_index: 0,
+            null_count: 0,
         }
-        DataType::Duration(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, DurationMicrosecondArray)
-        }
-        DataType::Duration(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, DurationNanosecondArray)
-        }
-        DataType::Timestamp(TimeUnit::Second, _) => {
-            filter_array!(array, filter, TimestampSecondArray)
+    }
+}
+
+impl<'a> CopyNullBit for NullBitSetter<'a> {
+    #[inline]
+    fn copy_null_bit(&mut self, source_index: usize) {
+        if !bit_util::get_bit(self.source_bytes, source_index) {
+            // this is not actually unsafe because of the condition above + target_buffer.len() == source_bytes.len()
+            unsafe {
+                bit_util::unset_bit_raw(
+                    self.target_buffer.raw_data_mut(),
+                    self.target_index,
+                );
+            }
+            self.null_count += 1;
         }
-        DataType::Timestamp(TimeUnit::Millisecond, _) => {
-            filter_array!(array, filter, TimestampMillisecondArray)
+        self.target_index += 1;
+    }
+
+    fn null_count(&self) -> usize {
+        self.null_count
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        self.target_buffer.resize(self.target_index).unwrap();
+        // use mem::replace to detach self.target_buffer from self so that it can be returned
+        let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0));
+        target_buffer.freeze()
+    }
+}
+
+fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit + 'a> {
+    if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
+        // only return an actual null bit copy implementation if null_bitmap is set
+        Box::new(NullBitSetter::new(null_bitmap))
+    } else {
+        // otherwise return a no-op copy null bit implementation
+        // for improved performance when the filtered array doesn't contain NULLs
+        Box::new(NullBitNoop::new())
+    }
+}
+
+// transmute filter array to u64
+// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits
+// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap
+fn filter_array_impl(
+    filter_context: &FilterContext,
+    data_array: &impl Array,
+    array_type: DataType,
+    value_size: usize,
+) -> Result<ArrayDataBuilder> {
+    if filter_context.filter_len > data_array.len() {
+        return Err(ArrowError::ComputeError(
+            "Filter array cannot be larger than data array".to_string(),
+        ));
+    }
+    let filtered_count = filter_context.filtered_count;
+    let filter_mask = &filter_context.filter_mask;
+    let filter_u64 = &filter_context.filter_u64;
+    let data_start = data_array.data_ref().buffers()[0].raw_data();
+    let mut value_buffer = MutableBuffer::new(filtered_count * value_size);
+    value_buffer.resize(filtered_count * value_size)?;
+    let mut value_position = value_buffer.raw_data_mut();
+    let mut null_bit_setter = get_null_bit_setter(data_array);
+    let null_bit_setter = null_bit_setter.as_mut();
+
+    for (i, filter_batch) in filter_u64.iter().enumerate() {
+        // foreach u64 batch
+        let filter_batch = *filter_batch;
+        if filter_batch == 0 {
+            // if batch == 0: skip
+            continue;
         }
-        DataType::Timestamp(TimeUnit::Microsecond, _) => {
-            filter_array!(array, filter, TimestampMicrosecondArray)
+        for (j, filter_mask) in filter_mask.iter().enumerate() {
+            // foreach bit in batch:
+            if (filter_batch & *filter_mask) != 0 {
+                let data_index = (i * 64) + j;
+                null_bit_setter.copy_null_bit(data_index);
+                // if filter bit == 1: copy data value to temp array
+                unsafe {
+                    // this should be safe because of the data_array.len() check at the beginning of the method
+                    memory::memcpy(
+                        value_position,
+                        data_start.add(value_size * data_index),
+                        value_size,
+                    );
+                    value_position = value_position.add(value_size);
+                }
+            }
         }
-        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
-            filter_array!(array, filter, TimestampNanosecondArray)
+    }
+
+    let mut array_data_builder = ArrayDataBuilder::new(array_type)
+        .len(filtered_count)
+        .add_buffer(value_buffer.freeze());
+    if null_bit_setter.null_count() > 0 {
+        array_data_builder = array_data_builder
+            .null_count(null_bit_setter.null_count())
+            .null_bit_buffer(null_bit_setter.null_buffer());
+    }
+
+    Ok(array_data_builder)
+}
+
+/// FilterContext can be used to improve performance when
+/// filtering multiple data arrays with the same filter array.
+#[derive(Debug)]
+pub struct FilterContext {
+    filter_u64: Vec<u64>,
+    filter_len: usize,
+    filtered_count: usize,
+    filter_mask: Vec<u64>,
+}
+
+macro_rules! filter_primitive_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_primitive_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+macro_rules! filter_dictionary_array {
+    ($context:expr, $array:expr, $array_type:ident) => {{
+        let input_array = $array.as_any().downcast_ref::<$array_type>().unwrap();
+        let output_array = $context.filter_dictionary_array(input_array)?;
+        Ok(Arc::new(output_array))
+    }};
+}
+
+impl FilterContext {
+    /// Returns a new instance of FilterContext
+    pub fn new(filter_array: &BooleanArray) -> Self {
+        let filter_mask: Vec<u64> = (0..64).map(|x| 1u64 << x).collect();
+        let filter_bytes = filter_array.data_ref().buffers()[0].data();
+        let filtered_count = bit_util::count_set_bits(filter_bytes);
+        // transmute filter_bytes to &[u64]
+        let mut u64_buffer = MutableBuffer::new(filter_bytes.len());
+        u64_buffer
+            .write_bytes(filter_bytes, u64_buffer.capacity() - filter_bytes.len())
+            .unwrap();
+        let filter_u64 = u64_buffer.typed_data_mut::<u64>().to_owned();
+        FilterContext {
+            filter_u64,
+            filter_len: filter_array.len(),
+            filtered_count,
+            filter_mask,
         }
-        DataType::Binary => {
-            let b = array.as_any().downcast_ref::<BinaryArray>().unwrap();
-            let mut values: Vec<&[u8]> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+    }
+
+    /// Returns a new array, containing only the elements matching the filter
+    pub fn filter(&self, array: &Array) -> Result<ArrayRef> {
+        match array.data_type() {
+            DataType::UInt8 => filter_primitive_array!(self, array, UInt8Array),
+            DataType::UInt16 => filter_primitive_array!(self, array, UInt16Array),
+            DataType::UInt32 => filter_primitive_array!(self, array, UInt32Array),
+            DataType::UInt64 => filter_primitive_array!(self, array, UInt64Array),
+            DataType::Int8 => filter_primitive_array!(self, array, Int8Array),
+            DataType::Int16 => filter_primitive_array!(self, array, Int16Array),
+            DataType::Int32 => filter_primitive_array!(self, array, Int32Array),
+            DataType::Int64 => filter_primitive_array!(self, array, Int64Array),
+            DataType::Float32 => filter_primitive_array!(self, array, Float32Array),
+            DataType::Float64 => filter_primitive_array!(self, array, Float64Array),
+            DataType::Boolean => {
+                let input_array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
+                let mut builder = BooleanArray::builder(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            if input_array.is_null(data_index) {
+                                builder.append_null()?;
+                            } else {
+                                builder.append_value(input_array.value(data_index))?;
+                            }
+                        }
+                    }
                 }
+                Ok(Arc::new(builder.finish()))
+            },
+            DataType::Date32(_) => filter_primitive_array!(self, array, Date32Array),
+            DataType::Date64(_) => filter_primitive_array!(self, array, Date64Array),
+            DataType::Time32(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, Time32SecondArray)
             }
-            Ok(Arc::new(BinaryArray::from(values)))
-        }
-        DataType::Utf8 => {
-            let b = array.as_any().downcast_ref::<StringArray>().unwrap();
-            let mut values: Vec<&str> = Vec::with_capacity(b.len());
-            for i in 0..b.len() {
-                if filter.value(i) {
-                    values.push(b.value(i));
+            DataType::Time32(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, Time32MillisecondArray)
+            }
+            DataType::Time64(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, Time64MicrosecondArray)
+            }
+            DataType::Time64(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, Time64NanosecondArray)
+            }
+            DataType::Duration(TimeUnit::Second) => {
+                filter_primitive_array!(self, array, DurationSecondArray)
+            }
+            DataType::Duration(TimeUnit::Millisecond) => {
+                filter_primitive_array!(self, array, DurationMillisecondArray)
+            }
+            DataType::Duration(TimeUnit::Microsecond) => {
+                filter_primitive_array!(self, array, DurationMicrosecondArray)
+            }
+            DataType::Duration(TimeUnit::Nanosecond) => {
+                filter_primitive_array!(self, array, DurationNanosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Second, _) => {
+                filter_primitive_array!(self, array, TimestampSecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Millisecond, _) => {
+                filter_primitive_array!(self, array, TimestampMillisecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Microsecond, _) => {
+                filter_primitive_array!(self, array, TimestampMicrosecondArray)
+            }
+            DataType::Timestamp(TimeUnit::Nanosecond, _) => {
+                filter_primitive_array!(self, array, TimestampNanosecondArray)
+            }
+            DataType::Binary => {
+                let input_array = array.as_any().downcast_ref::<BinaryArray>().unwrap();
+                let mut values: Vec<&[u8]> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
                 }
+                Ok(Arc::new(BinaryArray::from(values)))
             }
-            Ok(Arc::new(StringArray::from(values)))
+            DataType::Utf8 => {
+                let input_array = array.as_any().downcast_ref::<StringArray>().unwrap();
+                let mut values: Vec<&str> = Vec::with_capacity(self.filtered_count);
+                for i in 0..self.filter_u64.len() {
+                    // foreach u64 batch
+                    let filter_batch = self.filter_u64[i];
+                    if filter_batch == 0 {
+                        // if batch == 0: skip
+                        continue;
+                    }
+                    for j in 0..64 {
+                        // foreach bit in batch:
+                        if (filter_batch & self.filter_mask[j]) != 0 {
+                            let data_index = (i * 64) + j;
+                            values.push(input_array.value(data_index));
+                        }
+                    }
+                }
+                Ok(Arc::new(StringArray::from(values)))
+            }
+            DataType::Dictionary(ref key_type, ref value_type) => match (key_type.as_ref(), value_type.as_ref()) {
+                (key_type, DataType::Utf8) => match key_type {
+                    DataType::UInt8 => filter_dictionary_array!(self, array, UInt8DictionaryArray),
+                    DataType::UInt16 => filter_dictionary_array!(self, array, UInt16DictionaryArray),
+                    DataType::UInt32 => filter_dictionary_array!(self, array, UInt32DictionaryArray),
+                    DataType::UInt64 => filter_dictionary_array!(self, array, UInt64DictionaryArray),
+                    DataType::Int8 => filter_dictionary_array!(self, array, Int8DictionaryArray),
+                    DataType::Int16 => filter_dictionary_array!(self, array, Int16DictionaryArray),
+                    DataType::Int32 => filter_dictionary_array!(self, array, Int32DictionaryArray),
+                    DataType::Int64 => filter_dictionary_array!(self, array, Int64DictionaryArray),
+                    other => Err(ArrowError::ComputeError(format!(
+                        "filter not supported for string dictionary with key of type {:?}",
+                        other
+                    )))
+                }
+                (key_type, value_type) => Err(ArrowError::ComputeError(format!(
+                    "filter not supported for Dictionary({:?}, {:?})",
+                    key_type, value_type
+                )))
+            }
+            other => Err(ArrowError::ComputeError(format!(
+                "filter not supported for {:?}",
+                other
+            ))),
         }
-        other => Err(ArrowError::ComputeError(format!(
-            "filter not supported for {:?}",
-            other
-        ))),
     }
+
+    /// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter
+    pub fn filter_primitive_array<T>(
+        &self,
+        data_array: &PrimitiveArray<T>,
+    ) -> Result<PrimitiveArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = T::get_data_type();
+        let value_size = mem::size_of::<T::Native>();
+        let array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        let data = array_data_builder.build();
+        Ok(PrimitiveArray::<T>::from(data))
+    }
+
+    /// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+    /// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+    pub fn filter_dictionary_array<T>(
+        &self,
+        data_array: &DictionaryArray<T>,
+    ) -> Result<DictionaryArray<T>>
+    where
+        T: ArrowNumericType,
+    {
+        let array_type = data_array.data_type().clone();
+        let value_size = mem::size_of::<T::Native>();
+        let mut array_data_builder =
+            filter_array_impl(self, data_array, array_type, value_size)?;
+        // copy dictionary values from input array
+        array_data_builder =
+            array_data_builder.add_child_data(data_array.values().data());
+        let data = array_data_builder.build();
+        Ok(DictionaryArray::<T>::from(data))
+    }
+}
+
+/// Returns a new array, containing only the elements matching the filter.
+pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
+    FilterContext::new(filter).filter(array)
+}
+
+/// Returns a new PrimitiveArray<T> containing only those values from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter
+pub fn filter_primitive_array<T>(
+    data_array: &PrimitiveArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<PrimitiveArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_primitive_array(data_array)
+}
+
+/// Returns a new DictionaryArray<T> containing only those keys from the array passed as the data_array parameter,
+/// selected by the BooleanArray passed as the filter_array parameter. The values are cloned from the data_array.
+pub fn filter_dictionary_array<T>(
+    data_array: &DictionaryArray<T>,
+    filter_array: &BooleanArray,
+) -> Result<DictionaryArray<T>>
+where
+    T: ArrowNumericType,
+{
+    FilterContext::new(filter_array).filter_dictionary_array(data_array)
+}
+
+/// Returns a new RecordBatch with arrays containing only values matching the filter.
+/// The same FilterContext is re-used when filtering arrays in the RecordBatch for better performance.
+pub fn filter_record_batch(
+    record_batch: &RecordBatch,
+    filter_array: &BooleanArray,
+) -> Result<RecordBatch> {
+    let filter_context = FilterContext::new(filter_array);
+    let filtered_arrays = record_batch
+        .columns()
+        .iter()
+        .map(|a| filter_context.filter(a.as_ref()).unwrap())

Review comment:
       good spot, I have changed the filter_record_batch method to remove the unwrap() and use `.collect::<Result<Vec<ArrayRef>>>()?;` instead




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov edited a comment on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov edited a comment on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663888189


   @jorgecarleitao thanks for the feedback
   
   I did some more profiling specifically around the unsafe parts of the code and found that the safe version of `copy_null_bit` is just as fast so have removed that unsafe section; here are some benchmark results:
   
   copy_null_bit unsafe:
   filter context u8 w NULLs low selectivity  time:   [142.05 us 142.35 us 142.68 us]
   filter context u8 w NULLs high selectivity time:   [2.0915 us 2.1015 us 2.1127 us]
   
   copy_null_bit safe:
   filter context u8 w NULLs low selectivity  time:   [134.74 us 134.86 us 134.98 us]
   filter context u8 w NULLs high selectivity time:   [2.0536 us 2.0613 us 2.0707 us]
   
   I also benchmarked replacing the unsafe section in the `filter_array_impl` method with `value_buffer.write()` but this results in approximately 17% drop in performance with sparse filter arrays as can be seen from the benchmark results below:
   
   filter u8 low selectivity
                           time:   [131.08 us 132.46 us 134.27 us]
                           change: [+13.141% +17.189% +22.115%] (p = 0.00 < 0.05)
   
   filter context u8 low selectivity
                           time:   [127.47 us 129.27 us 131.56 us]
                           change: [+12.008% +19.674% +27.939%] (p = 0.00 < 0.05)
   
   filter context u8 w NULLs low selectivity
                           time:   [154.32 us 155.27 us 156.79 us]
                           change: [+15.444% +17.846% +22.268%] (p = 0.00 < 0.05)
   
   filter context f32 low selectivity
                           time:   [137.62 us 138.01 us 138.52 us]
                           change: [+12.495% +18.180% +23.088%] (p = 0.00 < 0.05)
   
   finally, looking at the C++ implementation inspired me to change the `filter_array_impl` method to add a special case where the 64bit filter batch is all 1s and this doesn't appear to reduce performance in other cases but improves performance of filtering with very dense filter arrays (almost all 1s) by about 20 times; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [109.75 us 110.30 us 111.02 us]
   filter u8 high selectivity                     time:   [4.8372 us 4.8433 us 4.8502 us]
   filter u8 very low selectivity                 time:   [11.782 us 11.798 us 11.816 us]
   filter context u8 low selectivity              time:   [109.07 us 109.65 us 110.66 us]
   filter context u8 high selectivity             time:   [1.4704 us 1.4762 us 1.4842 us]
   filter context u8 very low selectivity         time:   [8.8455 us 9.0530 us 9.3171 us]
   filter context u8 w NULLs low selectivity      time:   [135.32 us 135.49 us 135.66 us]
   filter context u8 w NULLs high selectivity     time:   [2.0579 us 2.0680 us 2.0796 us]
   filter context u8 w NULLs very low selectivity time:   [11.583 us 11.668 us 11.780 us]
   filter context f32 low selectivity             time:   [138.71 us 139.83 us 141.41 us]
   filter context f32 high selectivity            time:   [1.6111 us 1.6342 us 1.6605 us]
   filter context f32 very low selectivity        time:   [19.719 us 19.865 us 20.045 us]
   
   @andygrove  any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov commented on a change in pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on a change in pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#discussion_r461149342



##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,139 +17,466 @@
 
 //! Defines miscellaneous array kernels.
 
-use std::sync::Arc;
-
 use crate::array::*;
 use crate::datatypes::{ArrowNumericType, DataType, TimeUnit};
 use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+use crate::{
+    bitmap::Bitmap,
+    buffer::{Buffer, MutableBuffer},
+    memory,
+    util::bit_util,
+};
+use std::{mem, sync::Arc};
 
-/// Helper function to perform boolean lambda function on values from two arrays.
-fn bool_op<T, F>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-    op: F,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-    F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform math operation on arrays of different length".to_string(),
-        ));
+/// trait for copying filtered null bitmap bits
+trait CopyNullBit {
+    fn copy_null_bit(&mut self, source_index: usize);
+    fn copy_null_bits(&mut self, source_index: usize, count: usize);
+    fn null_count(&self) -> usize;
+    fn null_buffer(&mut self) -> Buffer;
+}
+
+/// no-op null bitmap copy implementation,
+/// used when the filtered data array doesn't have a null bitmap
+struct NullBitNoop {}
+
+impl NullBitNoop {
+    fn new() -> Self {
+        NullBitNoop {}
     }
-    let mut b = BooleanArray::builder(left.len());
-    for i in 0..left.len() {
-        let index = i;
-        let l = if left.is_null(i) {
-            None
-        } else {
-            Some(left.value(index))
-        };
-        let r = if right.is_null(i) {
-            None
-        } else {
-            Some(right.value(index))
-        };
-        b.append_value(op(l, r))?;
+}
+
+impl CopyNullBit for NullBitNoop {
+    #[inline]
+    fn copy_null_bit(&mut self, _source_index: usize) {
+        // do nothing
+    }
+
+    #[inline]
+    fn copy_null_bits(&mut self, _source_index: usize, _count: usize) {
+        // do nothing
+    }
+
+    fn null_count(&self) -> usize {
+        0
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        Buffer::from([0u8; 0])
     }
-    Ok(b.finish())
 }
 
-macro_rules! filter_array {
-    ($array:expr, $filter:expr, $array_type:ident) => {{
-        let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
-        let mut builder = $array_type::builder(b.len());
-        for i in 0..b.len() {
-            if $filter.value(i) {
-                if b.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(b.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
+/// null bitmap copy implementation,
+/// used when the filtered data array has a null bitmap
+struct NullBitSetter<'a> {
+    target_buffer: MutableBuffer,
+    source_bytes: &'a [u8],
+    target_index: usize,
+    null_count: usize,
 }
 
-/// Returns the array, taking only the elements matching the filter
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
-    match array.data_type() {
-        DataType::UInt8 => filter_array!(array, filter, UInt8Array),
-        DataType::UInt16 => filter_array!(array, filter, UInt16Array),
-        DataType::UInt32 => filter_array!(array, filter, UInt32Array),
-        DataType::UInt64 => filter_array!(array, filter, UInt64Array),
-        DataType::Int8 => filter_array!(array, filter, Int8Array),
-        DataType::Int16 => filter_array!(array, filter, Int16Array),
-        DataType::Int32 => filter_array!(array, filter, Int32Array),
-        DataType::Int64 => filter_array!(array, filter, Int64Array),
-        DataType::Float32 => filter_array!(array, filter, Float32Array),
-        DataType::Float64 => filter_array!(array, filter, Float64Array),
-        DataType::Boolean => filter_array!(array, filter, BooleanArray),
-        DataType::Date32(_) => filter_array!(array, filter, Date32Array),
-        DataType::Date64(_) => filter_array!(array, filter, Date64Array),
-        DataType::Time32(TimeUnit::Second) => {
-            filter_array!(array, filter, Time32SecondArray)
-        }
-        DataType::Time32(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, Time32MillisecondArray)
-        }
-        DataType::Time64(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, Time64MicrosecondArray)
-        }
-        DataType::Time64(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, Time64NanosecondArray)
-        }
-        DataType::Duration(TimeUnit::Second) => {
-            filter_array!(array, filter, DurationSecondArray)
-        }
-        DataType::Duration(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, DurationMillisecondArray)
-        }
-        DataType::Duration(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, DurationMicrosecondArray)
+impl<'a> NullBitSetter<'a> {
+    fn new(null_bitmap: &'a Bitmap) -> Self {
+        let null_bytes = null_bitmap.buffer_ref().data();
+        // create null bitmap buffer with same length and initialize null bitmap buffer to 1s
+        let null_buffer =
+            MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true);
+        NullBitSetter {
+            source_bytes: null_bytes,
+            target_buffer: null_buffer,
+            target_index: 0,
+            null_count: 0,
         }
-        DataType::Duration(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, DurationNanosecondArray)
+    }
+}
+
+impl<'a> CopyNullBit for NullBitSetter<'a> {
+    #[inline]
+    fn copy_null_bit(&mut self, source_index: usize) {
+        if !bit_util::get_bit(self.source_bytes, source_index) {
+            bit_util::unset_bit(self.target_buffer.data_mut(), self.target_index);
+            self.null_count += 1;
         }
-        DataType::Timestamp(TimeUnit::Second, _) => {
-            filter_array!(array, filter, TimestampSecondArray)
+        self.target_index += 1;
+    }
+
+    #[inline]
+    fn copy_null_bits(&mut self, source_index: usize, count: usize) {
+        for i in 0..count {
+            self.copy_null_bit(source_index + i);
         }
-        DataType::Timestamp(TimeUnit::Millisecond, _) => {
-            filter_array!(array, filter, TimestampMillisecondArray)
+    }
+
+    fn null_count(&self) -> usize {
+        self.null_count
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        self.target_buffer.resize(self.target_index).unwrap();
+        // use mem::replace to detach self.target_buffer from self so that it can be returned
+        let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0));
+        target_buffer.freeze()
+    }
+}
+
+fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit + 'a> {
+    if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
+        // only return an actual null bit copy implementation if null_bitmap is set
+        Box::new(NullBitSetter::new(null_bitmap))
+    } else {
+        // otherwise return a no-op copy null bit implementation
+        // for improved performance when the filtered array doesn't contain NULLs
+        Box::new(NullBitNoop::new())
+    }
+}
+
+// transmute filter array to u64
+// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits
+// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap
+fn filter_array_impl(
+    filter_context: &FilterContext,
+    data_array: &impl Array,
+    array_type: DataType,
+    value_size: usize,
+) -> Result<ArrayDataBuilder> {
+    if filter_context.filter_len > data_array.len() {
+        return Err(ArrowError::ComputeError(
+            "Filter array cannot be larger than data array".to_string(),
+        ));
+    }
+    let filtered_count = filter_context.filtered_count;
+    let filter_mask = &filter_context.filter_mask;
+    let filter_u64 = &filter_context.filter_u64;
+    let data_start = data_array.data_ref().buffers()[0].raw_data();
+    let mut value_buffer = MutableBuffer::new(filtered_count * value_size);
+    value_buffer.resize(filtered_count * value_size)?;
+    let mut value_position = value_buffer.raw_data_mut();
+    let mut null_bit_setter = get_null_bit_setter(data_array);
+    let null_bit_setter = null_bit_setter.as_mut();
+    let all_ones_batch = !0u64;
+
+    for (i, filter_batch) in filter_u64.iter().enumerate() {
+        // foreach u64 batch
+        let filter_batch = *filter_batch;
+        if filter_batch == 0 {
+            // if batch == 0: skip
+            continue;
+        } else if filter_batch == all_ones_batch {
+            // if batch == all 1s: copy all 64 values in one go
+            let data_index = i * 64;
+            let data_len = value_size * 64;
+            null_bit_setter.copy_null_bits(data_index, 64);
+            unsafe {

Review comment:
       @andygrove  I have replaced the use of unsafe memcpy with copy_from_slice - this does reduce performance (by about 12%) but it is still much faster than the old implementation; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [130.74 us 131.37 us 132.19 us]
   filter u8 high selectivity                     time:   [5.2031 us 5.2366 us 5.2764 us]
   filter u8 very low selectivity                 time:   [12.353 us 12.542 us 12.759 us]
   
   filter context u8 low selectivity              time:   [129.54 us 129.88 us 130.30 us]
   filter context u8 high selectivity             time:   [1.7926 us 1.7974 us 1.8046 us]
   filter context u8 very low selectivity         time:   [8.7700 us 8.7987 us 8.8342 us]
   
   filter context u8 w NULLs low selectivity      time:   [150.36 us 151.09 us 152.01 us]
   filter context u8 w NULLs high selectivity     time:   [2.4173 us 2.4882 us 2.5703 us]
   filter context u8 w NULLs very low selectivity time:   [158.86 us 160.32 us 162.26 us]
   
   filter context f32 low selectivity             time:   [123.38 us 124.34 us 126.18 us]
   filter context f32 high selectivity            time:   [1.4836 us 1.4994 us 1.5297 us]
   filter context f32 very low selectivity        time:   [19.422 us 19.653 us 19.932 us]




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov edited a comment on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov edited a comment on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663888189


   @jorgecarleitao thanks for the feedback
   
   I did some more profiling specifically around the unsafe parts of the code and found that the safe version of `copy_null_bit` is just as fast so have removed that unsafe section; here are some benchmark results:
   
   copy_null_bit unsafe:
   filter context u8 w NULLs low selectivity  time:   [142.05 us 142.35 us 142.68 us]
   filter context u8 w NULLs high selectivity time:   [2.0915 us 2.1015 us 2.1127 us]
   
   copy_null_bit safe:
   filter context u8 w NULLs low selectivity  time:   [134.74 us 134.86 us 134.98 us]
   filter context u8 w NULLs high selectivity time:   [2.0536 us 2.0613 us 2.0707 us]
   
   I also benchmarked replacing the unsafe section in the `filter_array_impl` method with `value_buffer.write()` but this results in approximately 18% drop in performance with sparse filter arrays as can be seen from the benchmark results below:
   
   filter u8 low selectivity
                           time:   [131.08 us 132.46 us 134.27 us]
                           change: [+13.141% +17.189% +22.115%] (p = 0.00 < 0.05)
   
   filter context u8 low selectivity
                           time:   [127.47 us 129.27 us 131.56 us]
                           change: [+12.008% +19.674% +27.939%] (p = 0.00 < 0.05)
   
   filter context u8 w NULLs low selectivity
                           time:   [154.32 us 155.27 us 156.79 us]
                           change: [+15.444% +17.846% +22.268%] (p = 0.00 < 0.05)
   
   filter context f32 low selectivity
                           time:   [137.62 us 138.01 us 138.52 us]
                           change: [+12.495% +18.180% +23.088%] (p = 0.00 < 0.05)
   
   finally, looking at the C++ implementation inspired me to change the `filter_array_impl` method to add a special case where the 64bit filter batch is all 1s and this doesn't appear to reduce performance in other cases but improves performance of filtering with very dense filter arrays (almost all 1s) by about 20 times; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [109.75 us 110.30 us 111.02 us]
   filter u8 high selectivity                     time:   [4.8372 us 4.8433 us 4.8502 us]
   filter u8 very low selectivity                 time:   [11.782 us 11.798 us 11.816 us]
   filter context u8 low selectivity              time:   [109.07 us 109.65 us 110.66 us]
   filter context u8 high selectivity             time:   [1.4704 us 1.4762 us 1.4842 us]
   filter context u8 very low selectivity         time:   [8.8455 us 9.0530 us 9.3171 us]
   filter context u8 w NULLs low selectivity      time:   [135.32 us 135.49 us 135.66 us]
   filter context u8 w NULLs high selectivity     time:   [2.0579 us 2.0680 us 2.0796 us]
   filter context u8 w NULLs very low selectivity time:   [11.583 us 11.668 us 11.780 us]
   filter context f32 low selectivity             time:   [138.71 us 139.83 us 141.41 us]
   filter context f32 high selectivity            time:   [1.6111 us 1.6342 us 1.6605 us]
   filter context f32 very low selectivity        time:   [19.719 us 19.865 us 20.045 us]
   
   @andygrove  any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov commented on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov commented on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663718817


   just for reference, I think the relevant C++ filter implementation (which wesm is referring to) is in the PrimitiveFilterImpl class here https://github.com/apache/arrow/blob/master/cpp/src/arrow/compute/kernels/vector_selection.cc#L558 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] andygrove commented on a change in pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
andygrove commented on a change in pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#discussion_r461228050



##########
File path: rust/arrow/src/compute/kernels/filter.rs
##########
@@ -17,139 +17,466 @@
 
 //! Defines miscellaneous array kernels.
 
-use std::sync::Arc;
-
 use crate::array::*;
 use crate::datatypes::{ArrowNumericType, DataType, TimeUnit};
 use crate::error::{ArrowError, Result};
+use crate::record_batch::RecordBatch;
+use crate::{
+    bitmap::Bitmap,
+    buffer::{Buffer, MutableBuffer},
+    memory,
+    util::bit_util,
+};
+use std::{mem, sync::Arc};
 
-/// Helper function to perform boolean lambda function on values from two arrays.
-fn bool_op<T, F>(
-    left: &PrimitiveArray<T>,
-    right: &PrimitiveArray<T>,
-    op: F,
-) -> Result<BooleanArray>
-where
-    T: ArrowNumericType,
-    F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
-{
-    if left.len() != right.len() {
-        return Err(ArrowError::ComputeError(
-            "Cannot perform math operation on arrays of different length".to_string(),
-        ));
+/// trait for copying filtered null bitmap bits
+trait CopyNullBit {
+    fn copy_null_bit(&mut self, source_index: usize);
+    fn copy_null_bits(&mut self, source_index: usize, count: usize);
+    fn null_count(&self) -> usize;
+    fn null_buffer(&mut self) -> Buffer;
+}
+
+/// no-op null bitmap copy implementation,
+/// used when the filtered data array doesn't have a null bitmap
+struct NullBitNoop {}
+
+impl NullBitNoop {
+    fn new() -> Self {
+        NullBitNoop {}
     }
-    let mut b = BooleanArray::builder(left.len());
-    for i in 0..left.len() {
-        let index = i;
-        let l = if left.is_null(i) {
-            None
-        } else {
-            Some(left.value(index))
-        };
-        let r = if right.is_null(i) {
-            None
-        } else {
-            Some(right.value(index))
-        };
-        b.append_value(op(l, r))?;
+}
+
+impl CopyNullBit for NullBitNoop {
+    #[inline]
+    fn copy_null_bit(&mut self, _source_index: usize) {
+        // do nothing
+    }
+
+    #[inline]
+    fn copy_null_bits(&mut self, _source_index: usize, _count: usize) {
+        // do nothing
+    }
+
+    fn null_count(&self) -> usize {
+        0
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        Buffer::from([0u8; 0])
     }
-    Ok(b.finish())
 }
 
-macro_rules! filter_array {
-    ($array:expr, $filter:expr, $array_type:ident) => {{
-        let b = $array.as_any().downcast_ref::<$array_type>().unwrap();
-        let mut builder = $array_type::builder(b.len());
-        for i in 0..b.len() {
-            if $filter.value(i) {
-                if b.is_null(i) {
-                    builder.append_null()?;
-                } else {
-                    builder.append_value(b.value(i))?;
-                }
-            }
-        }
-        Ok(Arc::new(builder.finish()))
-    }};
+/// null bitmap copy implementation,
+/// used when the filtered data array has a null bitmap
+struct NullBitSetter<'a> {
+    target_buffer: MutableBuffer,
+    source_bytes: &'a [u8],
+    target_index: usize,
+    null_count: usize,
 }
 
-/// Returns the array, taking only the elements matching the filter
-pub fn filter(array: &Array, filter: &BooleanArray) -> Result<ArrayRef> {
-    match array.data_type() {
-        DataType::UInt8 => filter_array!(array, filter, UInt8Array),
-        DataType::UInt16 => filter_array!(array, filter, UInt16Array),
-        DataType::UInt32 => filter_array!(array, filter, UInt32Array),
-        DataType::UInt64 => filter_array!(array, filter, UInt64Array),
-        DataType::Int8 => filter_array!(array, filter, Int8Array),
-        DataType::Int16 => filter_array!(array, filter, Int16Array),
-        DataType::Int32 => filter_array!(array, filter, Int32Array),
-        DataType::Int64 => filter_array!(array, filter, Int64Array),
-        DataType::Float32 => filter_array!(array, filter, Float32Array),
-        DataType::Float64 => filter_array!(array, filter, Float64Array),
-        DataType::Boolean => filter_array!(array, filter, BooleanArray),
-        DataType::Date32(_) => filter_array!(array, filter, Date32Array),
-        DataType::Date64(_) => filter_array!(array, filter, Date64Array),
-        DataType::Time32(TimeUnit::Second) => {
-            filter_array!(array, filter, Time32SecondArray)
-        }
-        DataType::Time32(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, Time32MillisecondArray)
-        }
-        DataType::Time64(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, Time64MicrosecondArray)
-        }
-        DataType::Time64(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, Time64NanosecondArray)
-        }
-        DataType::Duration(TimeUnit::Second) => {
-            filter_array!(array, filter, DurationSecondArray)
-        }
-        DataType::Duration(TimeUnit::Millisecond) => {
-            filter_array!(array, filter, DurationMillisecondArray)
-        }
-        DataType::Duration(TimeUnit::Microsecond) => {
-            filter_array!(array, filter, DurationMicrosecondArray)
+impl<'a> NullBitSetter<'a> {
+    fn new(null_bitmap: &'a Bitmap) -> Self {
+        let null_bytes = null_bitmap.buffer_ref().data();
+        // create null bitmap buffer with same length and initialize null bitmap buffer to 1s
+        let null_buffer =
+            MutableBuffer::new(null_bytes.len()).with_bitset(null_bytes.len(), true);
+        NullBitSetter {
+            source_bytes: null_bytes,
+            target_buffer: null_buffer,
+            target_index: 0,
+            null_count: 0,
         }
-        DataType::Duration(TimeUnit::Nanosecond) => {
-            filter_array!(array, filter, DurationNanosecondArray)
+    }
+}
+
+impl<'a> CopyNullBit for NullBitSetter<'a> {
+    #[inline]
+    fn copy_null_bit(&mut self, source_index: usize) {
+        if !bit_util::get_bit(self.source_bytes, source_index) {
+            bit_util::unset_bit(self.target_buffer.data_mut(), self.target_index);
+            self.null_count += 1;
         }
-        DataType::Timestamp(TimeUnit::Second, _) => {
-            filter_array!(array, filter, TimestampSecondArray)
+        self.target_index += 1;
+    }
+
+    #[inline]
+    fn copy_null_bits(&mut self, source_index: usize, count: usize) {
+        for i in 0..count {
+            self.copy_null_bit(source_index + i);
         }
-        DataType::Timestamp(TimeUnit::Millisecond, _) => {
-            filter_array!(array, filter, TimestampMillisecondArray)
+    }
+
+    fn null_count(&self) -> usize {
+        self.null_count
+    }
+
+    fn null_buffer(&mut self) -> Buffer {
+        self.target_buffer.resize(self.target_index).unwrap();
+        // use mem::replace to detach self.target_buffer from self so that it can be returned
+        let target_buffer = mem::replace(&mut self.target_buffer, MutableBuffer::new(0));
+        target_buffer.freeze()
+    }
+}
+
+fn get_null_bit_setter<'a>(data_array: &'a impl Array) -> Box<CopyNullBit + 'a> {
+    if let Some(null_bitmap) = data_array.data_ref().null_bitmap() {
+        // only return an actual null bit copy implementation if null_bitmap is set
+        Box::new(NullBitSetter::new(null_bitmap))
+    } else {
+        // otherwise return a no-op copy null bit implementation
+        // for improved performance when the filtered array doesn't contain NULLs
+        Box::new(NullBitNoop::new())
+    }
+}
+
+// transmute filter array to u64
+// - optimize filtering with highly selective filters by skipping entire batches of 64 filter bits
+// - if the data array being filtered doesn't have a null bitmap, no time is wasted to copy a null bitmap
+fn filter_array_impl(
+    filter_context: &FilterContext,
+    data_array: &impl Array,
+    array_type: DataType,
+    value_size: usize,
+) -> Result<ArrayDataBuilder> {
+    if filter_context.filter_len > data_array.len() {
+        return Err(ArrowError::ComputeError(
+            "Filter array cannot be larger than data array".to_string(),
+        ));
+    }
+    let filtered_count = filter_context.filtered_count;
+    let filter_mask = &filter_context.filter_mask;
+    let filter_u64 = &filter_context.filter_u64;
+    let data_start = data_array.data_ref().buffers()[0].raw_data();
+    let mut value_buffer = MutableBuffer::new(filtered_count * value_size);
+    value_buffer.resize(filtered_count * value_size)?;
+    let mut value_position = value_buffer.raw_data_mut();
+    let mut null_bit_setter = get_null_bit_setter(data_array);
+    let null_bit_setter = null_bit_setter.as_mut();
+    let all_ones_batch = !0u64;
+
+    for (i, filter_batch) in filter_u64.iter().enumerate() {
+        // foreach u64 batch
+        let filter_batch = *filter_batch;
+        if filter_batch == 0 {
+            // if batch == 0: skip
+            continue;
+        } else if filter_batch == all_ones_batch {
+            // if batch == all 1s: copy all 64 values in one go
+            let data_index = i * 64;
+            let data_len = value_size * 64;
+            null_bit_setter.copy_null_bits(data_index, 64);
+            unsafe {

Review comment:
       Thanks @yordan-pavlov We can always consider the unsafe changes in a future PR, but removing them for now makes it much easier to get these changes merged.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [arrow] yordan-pavlov edited a comment on pull request #7798: ARROW-9523 [Rust] Improve filter kernel performance

Posted by GitBox <gi...@apache.org>.
yordan-pavlov edited a comment on pull request #7798:
URL: https://github.com/apache/arrow/pull/7798#issuecomment-663888189


   @jorgecarleitao thanks for the feedback
   
   I did some more profiling specifically around the unsafe parts of the code and found that the safe version of `copy_null_bit` is just as fast so have removed that unsafe section; here are some benchmark results:
   
   copy_null_bit unsafe:
   filter context u8 w NULLs low selectivity  time:   [142.05 us 142.35 us 142.68 us]
   filter context u8 w NULLs high selectivity time:   [2.0915 us 2.1015 us 2.1127 us]
   
   copy_null_bit safe:
   filter context u8 w NULLs low selectivity  time:   [134.74 us 134.86 us 134.98 us]
   filter context u8 w NULLs high selectivity time:   [2.0536 us 2.0613 us 2.0707 us]
   
   I also benchmarked replacing the unsafe section in the `filter_array_impl` method with `value_buffer.write()` but this results in approximately 18% drop in performance with sparse filter arrays as can be seen from the benchmark results below:
   
   filter u8 low selectivity
                           time:   [131.08 us 132.46 us 134.27 us]
                           change: [+13.141% +17.189% +22.115%] (p = 0.00 < 0.05)
   
   filter context u8 low selectivity
                           time:   [127.47 us 129.27 us 131.56 us]
                           change: [+12.008% +19.674% +27.939%] (p = 0.00 < 0.05)
   
   filter context u8 w NULLs low selectivity
                           time:   [154.32 us 155.27 us 156.79 us]
                           change: [+15.444% +17.846% +22.268%] (p = 0.00 < 0.05)
   
   filter context f32 low selectivity
                           time:   [137.62 us 138.01 us 138.52 us]
                           change: [+12.495% +18.180% +23.088%] (p = 0.00 < 0.05)
   
   finally, looking at the C++ implementation inspired me to change the `filter_array_impl` method to add a special case where the 64bit filter batch is all 1s and this doesn't appear to reduce performance in other cases but improves performance of filtering with very dense filter arrays (almost all 1s) by about 20 times; here are the latest benchmark results:
   
   filter u8 low selectivity                      time:   [118.21 us 118.82 us 119.61 us]
   filter u8 high selectivity                     time:   [4.9893 us 4.9941 us 4.9998 us]
   filter u8 very low selectivity                 time:   [11.861 us 11.919 us 11.987 us]
   
   filter context u8 low selectivity              time:   [115.22 us 115.77 us 116.36 us]
   filter context u8 high selectivity             time:   [1.6571 us 1.6784 us 1.7033 us]
   filter context u8 very low selectivity         time:   [8.4205 us 8.4370 us 8.4590 us]
   
   filter context u8 w NULLs low selectivity      time:   [132.49 us 132.78 us 133.10 us]
   filter context u8 w NULLs high selectivity     time:   [2.1935 us 2.1979 us 2.2030 us]
   filter context u8 w NULLs very low selectivity time:   [161.64 us 162.12 us 162.55 us]
   
   filter context f32 low selectivity             time:   [158.81 us 161.58 us 164.61 us]
   filter context f32 high selectivity            time:   [1.8318 us 1.8371 us 1.8436 us]
   filter context f32 very low selectivity        time:   [18.658 us 18.785 us 18.935 us]
   
   @andygrove  any thoughts?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org