You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2020/09/16 05:10:25 UTC

[GitHub] [arrow] nevi-me commented on a change in pull request #8170: ARROW-9971: [Rust] Improve speed of `take` by 2x-3x (change scaling with batch size)

nevi-me commented on a change in pull request #8170:
URL: https://github.com/apache/arrow/pull/8170#discussion_r489159456



##########
File path: rust/arrow/src/compute/kernels/take.rs
##########
@@ -166,42 +166,124 @@ fn take_primitive<T>(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRe
 where
     T: ArrowPrimitiveType,
 {
-    let mut builder = PrimitiveBuilder::<T>::new(indices.len());
-    let a = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
-    for i in 0..indices.len() {
-        if indices.is_null(i) {
-            // populate with null if index is null
-            builder.append_null()?;
-        } else {
-            // get index value to use in looking up the value from `values`
-            let ix = indices.value(i) as usize;
-            if a.is_valid(ix) {
-                builder.append_value(a.value(ix))?;
-            } else {
-                builder.append_null()?;
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
+
+    let num_bytes = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);

Review comment:
       Out of curiousity, would it be better if we set bits to true, then unset the null ones? The typical case will be that arrows are dense, so we could likely reduce calls to bitset. I haven't looked at the assembly that's generated to see if the compiler uses bit-manipulation instructions, or generally how many instructions we spend on bit manipulation.
   
   I'm assuming that where `data_len % 8 != 0` we end up with the trailing bits set. If this causes problems with equality comparisons, we could reset those bits to `false`, if we start with `MutableBuffer::new(num_bytes).with_bitset(num_bytes, true)`.
   
   Do you mind trying out this approach and observing the impact on the benchmark?

##########
File path: rust/arrow/src/compute/kernels/take.rs
##########
@@ -166,42 +166,124 @@ fn take_primitive<T>(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRe
 where
     T: ArrowPrimitiveType,
 {
-    let mut builder = PrimitiveBuilder::<T>::new(indices.len());
-    let a = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
-    for i in 0..indices.len() {
-        if indices.is_null(i) {
-            // populate with null if index is null
-            builder.append_null()?;
-        } else {
-            // get index value to use in looking up the value from `values`
-            let ix = indices.value(i) as usize;
-            if a.is_valid(ix) {
-                builder.append_value(a.value(ix))?;
-            } else {
-                builder.append_null()?;
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
+
+    let num_bytes = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+
+    let null_slice = null_buf.data_mut();
+
+    let new_values: Vec<T::Native> = (0..data_len)
+        .map(|i| {
+            let index = indices.value(i) as usize;
+            if array.is_valid(index) {
+                bit_util::set_bit(null_slice, i);
+            }
+            array.value(index)
+        })
+        .collect();
+
+    let nulls = match indices.data_ref().null_buffer() {
+        Some(buffer) => buffer_bin_and(buffer, 0, &null_buf.freeze(), 0, indices.len()),
+        None => null_buf.freeze(),
+    };
+
+    let data = ArrayData::new(
+        T::get_data_type(),
+        indices.len(),
+        None,
+        Some(nulls),
+        0,
+        vec![Buffer::from(new_values.to_byte_slice())],
+        vec![],
+    );
+    Ok(Arc::new(PrimitiveArray::<T>::from(Arc::new(data))))
+}
+
+/// `take` implementation for boolean arrays
+fn take_boolean(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRef> {
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+    let num_byte = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false);
+    let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false);
+
+    let null_slice = null_buf.data_mut();
+    let val_slice = val_buf.data_mut();
+
+    (0..data_len).for_each(|i| {
+        let index = indices.value(i) as usize;
+        if array.is_valid(index) {
+            bit_util::set_bit(null_slice, i);
+            if array.value(index) {
+                bit_util::set_bit(val_slice, i);
             }
         }
-    }
-    Ok(Arc::new(builder.finish()) as ArrayRef)
+    });
+
+    let nulls = match indices.data_ref().null_buffer() {
+        Some(buffer) => buffer_bin_and(buffer, 0, &null_buf.freeze(), 0, indices.len()),
+        None => null_buf.freeze(),
+    };
+
+    let data = ArrayData::new(
+        DataType::Boolean,
+        indices.len(),
+        None,
+        Some(nulls),
+        0,
+        vec![val_buf.freeze()],
+        vec![],
+    );
+    Ok(Arc::new(BooleanArray::from(Arc::new(data))))
 }
 
 /// `take` implementation for string arrays
 fn take_string(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRef> {
-    let mut builder = StringBuilder::new(indices.len());
-    let a = values.as_any().downcast_ref::<StringArray>().unwrap();
-    for i in 0..indices.len() {
-        if indices.is_null(i) {
-            builder.append(false)?;
-        } else {
-            let ix = indices.value(i) as usize;
-            if a.is_null(ix) {
-                builder.append(false)?;
-            } else {
-                builder.append_value(a.value(ix))?;
-            }
-        }
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<StringArray>().unwrap();
+
+    let num_bytes = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+    let null_slice = null_buf.data_mut();
+
+    let mut offsets = Vec::with_capacity(data_len + 1);
+    let mut values = Vec::with_capacity(data_len);

Review comment:
       Would we benefit from allocating this to the length of the input array's string buffer length? I'm assuming that `data_len` is not enough as strings will mostly have > 1 char.

##########
File path: rust/arrow/src/compute/kernels/take.rs
##########
@@ -166,42 +166,124 @@ fn take_primitive<T>(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRe
 where
     T: ArrowPrimitiveType,
 {
-    let mut builder = PrimitiveBuilder::<T>::new(indices.len());
-    let a = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
-    for i in 0..indices.len() {
-        if indices.is_null(i) {
-            // populate with null if index is null
-            builder.append_null()?;
-        } else {
-            // get index value to use in looking up the value from `values`
-            let ix = indices.value(i) as usize;
-            if a.is_valid(ix) {
-                builder.append_value(a.value(ix))?;
-            } else {
-                builder.append_null()?;
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
+
+    let num_bytes = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+
+    let null_slice = null_buf.data_mut();
+
+    let new_values: Vec<T::Native> = (0..data_len)
+        .map(|i| {
+            let index = indices.value(i) as usize;
+            if array.is_valid(index) {
+                bit_util::set_bit(null_slice, i);
+            }
+            array.value(index)
+        })
+        .collect();
+
+    let nulls = match indices.data_ref().null_buffer() {
+        Some(buffer) => buffer_bin_and(buffer, 0, &null_buf.freeze(), 0, indices.len()),
+        None => null_buf.freeze(),
+    };
+
+    let data = ArrayData::new(
+        T::get_data_type(),
+        indices.len(),
+        None,
+        Some(nulls),
+        0,
+        vec![Buffer::from(new_values.to_byte_slice())],
+        vec![],
+    );
+    Ok(Arc::new(PrimitiveArray::<T>::from(Arc::new(data))))
+}
+
+/// `take` implementation for boolean arrays
+fn take_boolean(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRef> {
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+    let num_byte = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false);

Review comment:
       If above works, we could do same with the `null_buf` here

##########
File path: rust/arrow/src/compute/kernels/take.rs
##########
@@ -166,42 +166,124 @@ fn take_primitive<T>(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRe
 where
     T: ArrowPrimitiveType,
 {
-    let mut builder = PrimitiveBuilder::<T>::new(indices.len());
-    let a = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
-    for i in 0..indices.len() {
-        if indices.is_null(i) {
-            // populate with null if index is null
-            builder.append_null()?;
-        } else {
-            // get index value to use in looking up the value from `values`
-            let ix = indices.value(i) as usize;
-            if a.is_valid(ix) {
-                builder.append_value(a.value(ix))?;
-            } else {
-                builder.append_null()?;
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
+
+    let num_bytes = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+
+    let null_slice = null_buf.data_mut();
+
+    let new_values: Vec<T::Native> = (0..data_len)
+        .map(|i| {
+            let index = indices.value(i) as usize;
+            if array.is_valid(index) {
+                bit_util::set_bit(null_slice, i);
+            }
+            array.value(index)
+        })
+        .collect();
+
+    let nulls = match indices.data_ref().null_buffer() {
+        Some(buffer) => buffer_bin_and(buffer, 0, &null_buf.freeze(), 0, indices.len()),
+        None => null_buf.freeze(),
+    };
+
+    let data = ArrayData::new(
+        T::get_data_type(),
+        indices.len(),
+        None,
+        Some(nulls),
+        0,
+        vec![Buffer::from(new_values.to_byte_slice())],
+        vec![],
+    );
+    Ok(Arc::new(PrimitiveArray::<T>::from(Arc::new(data))))
+}
+
+/// `take` implementation for boolean arrays
+fn take_boolean(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRef> {
+    let data_len = indices.len();
+
+    let array = values.as_any().downcast_ref::<BooleanArray>().unwrap();
+
+    let num_byte = bit_util::ceil(data_len, 8);
+    let mut null_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false);
+    let mut val_buf = MutableBuffer::new(num_byte).with_bitset(num_byte, false);
+
+    let null_slice = null_buf.data_mut();
+    let val_slice = val_buf.data_mut();
+
+    (0..data_len).for_each(|i| {
+        let index = indices.value(i) as usize;
+        if array.is_valid(index) {
+            bit_util::set_bit(null_slice, i);
+            if array.value(index) {
+                bit_util::set_bit(val_slice, i);
             }
         }
-    }
-    Ok(Arc::new(builder.finish()) as ArrayRef)
+    });
+
+    let nulls = match indices.data_ref().null_buffer() {
+        Some(buffer) => buffer_bin_and(buffer, 0, &null_buf.freeze(), 0, indices.len()),
+        None => null_buf.freeze(),
+    };
+
+    let data = ArrayData::new(
+        DataType::Boolean,
+        indices.len(),
+        None,
+        Some(nulls),
+        0,
+        vec![val_buf.freeze()],
+        vec![],
+    );
+    Ok(Arc::new(BooleanArray::from(Arc::new(data))))
 }
 
 /// `take` implementation for string arrays
 fn take_string(values: &ArrayRef, indices: &UInt32Array) -> Result<ArrayRef> {

Review comment:
       do you mind adding a take for `LargeStringArray`?

##########
File path: rust/arrow/src/compute/kernels/take.rs
##########
@@ -425,22 +514,32 @@ mod tests {
     #[test]
     fn test_take_string() {
         let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(4)]);
-        let mut builder: StringBuilder = StringBuilder::new(6);
-        builder.append_value("one").unwrap();
-        builder.append_null().unwrap();
-        builder.append_value("three").unwrap();
-        builder.append_value("four").unwrap();
-        builder.append_value("five").unwrap();
-        let array = Arc::new(builder.finish()) as ArrayRef;
-        let a = take(&array, &index, None).unwrap();
-        assert_eq!(a.len(), index.len());
-        builder.append_value("four").unwrap();
-        builder.append_null().unwrap();
-        builder.append_null().unwrap();
-        builder.append_value("four").unwrap();
-        builder.append_value("five").unwrap();
-        let b = builder.finish();
-        assert_eq!(a.data(), b.data());
+
+        let array = StringArray::from(vec![
+            Some("one"),
+            None,
+            Some("three"),
+            Some("four"),
+            Some("five"),
+        ]);
+        let array = Arc::new(array) as ArrayRef;
+
+        let actual = take(&array, &index, None).unwrap();
+        assert_eq!(actual.len(), index.len());
+
+        let actual = actual.as_any().downcast_ref::<StringArray>().unwrap();
+
+        let expected =
+            StringArray::from(vec![Some("four"), None, None, Some("four"), Some("five")]);
+
+        for i in 0..index.len() {

Review comment:
       I'll have a look at this. Almost time to start with $dayjob :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org