You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/18 13:03:23 UTC

[GitHub] [arrow] mqy commented on a change in pull request #9235: ARROW-11291: [Rust] Add extend to MutableBuffer (-25% for aritmetic, -97% for length)

mqy commented on a change in pull request #9235:
URL: https://github.com/apache/arrow/pull/9235#discussion_r559460523



##########
File path: rust/arrow/src/array/array_primitive.rs
##########
@@ -295,20 +295,21 @@ impl<T: ArrowPrimitiveType, Ptr: Borrow<Option<<T as ArrowPrimitiveType>::Native
         let data_len = data_len.expect("Iterator must be sized"); // panic if no upper bound.
 
         let num_bytes = bit_util::ceil(data_len, 8);
-        let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, false);
+        let mut null_buf = MutableBuffer::from_len_zeroed(num_bytes);
         let mut val_buf = MutableBuffer::new(
             data_len * mem::size_of::<<T as ArrowPrimitiveType>::Native>(),
         );
 
-        let null = vec![0; mem::size_of::<<T as ArrowPrimitiveType>::Native>()];
-
         let null_slice = null_buf.as_slice_mut();
         iter.enumerate().for_each(|(i, item)| {
             if let Some(a) = item.borrow() {
                 bit_util::set_bit(null_slice, i);
-                val_buf.extend_from_slice(a.to_byte_slice());
+                val_buf.push(*a);
             } else {
-                val_buf.extend_from_slice(&null);
+                // this ensures that null items on the buffer are not arbitrary.
+                // This is important because falible operations can use null values (e.g. a vectorized "add")

Review comment:
       falible -> fallible

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -758,39 +793,54 @@ impl MutableBuffer {
         }
     }
 
-    /// Ensures that this buffer has at least `capacity` slots in this buffer. This will
-    /// also ensure the new capacity will be a multiple of 64 bytes.
-    ///
-    /// Returns the new capacity for this buffer.
-    pub fn reserve(&mut self, capacity: usize) -> usize {
-        if capacity > self.capacity {
-            let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
-            let new_capacity = cmp::max(new_capacity, self.capacity * 2);
-            self.data =
-                unsafe { memory::reallocate(self.data, self.capacity, new_capacity) };
-            self.capacity = new_capacity;
+    fn reserve_at_least(&mut self, capacity: usize) {
+        let new_capacity = bit_util::round_upto_multiple_of_64(capacity);
+        let new_capacity = std::cmp::max(new_capacity, self.capacity * 2);
+        self.data = unsafe { memory::reallocate(self.data, self.capacity, new_capacity) };
+        self.capacity = new_capacity;
+    }
+
+    /// Ensures that this buffer has at least `self.len + additional` bytes. This re-allocates iff
+    /// `self.len + additional > capacity`.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::{Buffer, MutableBuffer};
+    /// let mut buffer = MutableBuffer::new(0);
+    /// buffer.reserve(253); // allocates for the first time
+    /// (0..253u8).for_each(|i| buffer.push(i)); // no reallocation
+    /// let buffer: Buffer = buffer.into();
+    /// assert_eq!(buffer.len(), 253);
+    /// ```
+    // For performance reasons, this must be inlined so that the `if` is executed inside the caller, and not as an extra call that just
+    // exits.
+    #[inline(always)]
+    pub fn reserve(&mut self, additional: usize) {
+        let required_cap = self.len + additional;
+        if required_cap > self.capacity {
+            self.reserve_at_least(required_cap)
         }
-        self.capacity
     }
 
-    /// Resizes the buffer so that the `len` will equal to the `new_len`.
-    ///
-    /// If `new_len` is greater than `len`, the buffer's length is simply adjusted to be
-    /// the former, optionally extending the capacity. The data between `len` and
-    /// `new_len` will be zeroed out.
-    ///
-    /// If `new_len` is less than `len`, the buffer will be truncated.
-    pub fn resize(&mut self, new_len: usize) {
+    /// Resizes the buffer, either truncating its contents (with no change in capacity), or

Review comment:
       In order to reduce the memory pressure in some scenarios, perhaps it's sound to make the `grow-only(no shrink cap) behavior` configurable with an additional boolean argument (default true).

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -861,49 +913,147 @@ impl MutableBuffer {
         }
     }
 
-    /// View buffer as typed slice.
+    /// View this buffer asa slice of a specific type.
+    /// # Safety
+    /// This function must only be used when this buffer was extended with items of type `T`.
+    /// Failure to do so results in undefined behavior.
     pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
-        assert_eq!(self.len() % mem::size_of::<T>(), 0);
-        assert!(memory::is_ptr_aligned::<T>(self.data.cast()));
-        // JUSTIFICATION
-        //  Benefit
-        //      Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them.
-        //  Soundness
-        //      * The pointer is non-null by construction
-        //      * alignment asserted above
         unsafe {
-            std::slice::from_raw_parts_mut(
-                self.as_ptr() as *mut T,
-                self.len() / mem::size_of::<T>(),
-            )
+            let (prefix, offsets, suffix) = self.as_slice_mut().align_to_mut::<T>();
+            assert!(prefix.is_empty() && suffix.is_empty());
+            offsets
         }
     }
 
-    /// Extends the buffer from a byte slice, incrementing its capacity if needed.
-    #[inline]
-    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
-        let new_len = self.len + bytes.len();
-        if new_len > self.capacity {
-            self.reserve(new_len);
+    /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::MutableBuffer;
+    /// let mut buffer = MutableBuffer::new(0);
+    /// buffer.extend_from_slice(&[2u32, 0]);
+    /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
+    /// ```
+    pub fn extend_from_slice<T: ToByteSlice>(&mut self, items: &[T]) {
+        let len = items.len();
+        let additional = len * std::mem::size_of::<T>();
+        self.reserve(additional);
+        unsafe {
+            let dst = self.data.as_ptr().add(self.len);
+            let src = items.as_ptr() as *const u8;
+            std::ptr::copy_nonoverlapping(src, dst, additional)
         }
+        self.len += additional;
+    }
+
+    /// Extends the buffer with a new item, increasing its capacity if needed.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::MutableBuffer;
+    /// let mut buffer = MutableBuffer::new(0);
+    /// buffer.push(256u32);
+    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
+    /// ```
+    #[inline]
+    pub fn push<T: ToByteSlice>(&mut self, item: T) {
+        let additional = std::mem::size_of::<T>();
+        self.reserve(additional);
         unsafe {
-            let dst = NonNull::new_unchecked(self.data.as_ptr().add(self.len));
-            let src = NonNull::new_unchecked(bytes.as_ptr() as *mut u8);
-            memory::memcpy(dst, src, bytes.len());
+            let dst = self.data.as_ptr().add(self.len) as *mut T;
+            std::ptr::write(dst, item);
         }
-        self.len = new_len;
+        self.len += additional;
+    }
+
+    /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
+    #[inline]
+    pub fn extend_zeros(&mut self, additional: usize) {
+        self.resize(self.len + additional, 0);
     }
+}
 
-    /// Extends the buffer by `len` with all bytes equal to `0u8`, incrementing its capacity if needed.
-    pub fn extend(&mut self, len: usize) {
-        let remaining_capacity = self.capacity - self.len;
-        if len > remaining_capacity {
-            self.reserve(self.len + len);
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    /// Extends [`MutableBuffer`] from an [`Iterator`]. If the iterator is an [`ExactSizeIterator`],
+    /// use [`extend_from_exact_iter`] as it is faster.
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+// This implementation is required for two reasons:
+// 1. In stable rust, there is no trait specialization and therefore
+//    we can't specialize `extend_from_iter` for `ExactSizeIterator` like `Vec` does.
+// 2. Extend for `ExactSizeIterator` is at least 2x faster (different scaling).
+// Unfortunately, there is no "FromExactSizeIterator", only `FromIterator`,
+// which means that `collect` is slow and we should avoid it to build a Buffer. This should

Review comment:
       "This should" what :D

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -861,49 +913,147 @@ impl MutableBuffer {
         }
     }
 
-    /// View buffer as typed slice.
+    /// View this buffer asa slice of a specific type.
+    /// # Safety
+    /// This function must only be used when this buffer was extended with items of type `T`.
+    /// Failure to do so results in undefined behavior.
     pub fn typed_data_mut<T: ArrowNativeType>(&mut self) -> &mut [T] {
-        assert_eq!(self.len() % mem::size_of::<T>(), 0);
-        assert!(memory::is_ptr_aligned::<T>(self.data.cast()));
-        // JUSTIFICATION
-        //  Benefit
-        //      Many of the buffers represent specific types, and consumers of `Buffer` often need to re-interpret them.
-        //  Soundness
-        //      * The pointer is non-null by construction
-        //      * alignment asserted above
         unsafe {
-            std::slice::from_raw_parts_mut(
-                self.as_ptr() as *mut T,
-                self.len() / mem::size_of::<T>(),
-            )
+            let (prefix, offsets, suffix) = self.as_slice_mut().align_to_mut::<T>();
+            assert!(prefix.is_empty() && suffix.is_empty());
+            offsets
         }
     }
 
-    /// Extends the buffer from a byte slice, incrementing its capacity if needed.
-    #[inline]
-    pub fn extend_from_slice(&mut self, bytes: &[u8]) {
-        let new_len = self.len + bytes.len();
-        if new_len > self.capacity {
-            self.reserve(new_len);
+    /// Extends this buffer from a slice of items that can be represented in bytes, increasing its capacity if needed.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::MutableBuffer;
+    /// let mut buffer = MutableBuffer::new(0);
+    /// buffer.extend_from_slice(&[2u32, 0]);
+    /// assert_eq!(buffer.len(), 8) // u32 has 4 bytes
+    /// ```
+    pub fn extend_from_slice<T: ToByteSlice>(&mut self, items: &[T]) {
+        let len = items.len();
+        let additional = len * std::mem::size_of::<T>();
+        self.reserve(additional);
+        unsafe {
+            let dst = self.data.as_ptr().add(self.len);
+            let src = items.as_ptr() as *const u8;
+            std::ptr::copy_nonoverlapping(src, dst, additional)
         }
+        self.len += additional;
+    }
+
+    /// Extends the buffer with a new item, increasing its capacity if needed.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::MutableBuffer;
+    /// let mut buffer = MutableBuffer::new(0);
+    /// buffer.push(256u32);
+    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
+    /// ```
+    #[inline]
+    pub fn push<T: ToByteSlice>(&mut self, item: T) {
+        let additional = std::mem::size_of::<T>();
+        self.reserve(additional);
         unsafe {
-            let dst = NonNull::new_unchecked(self.data.as_ptr().add(self.len));
-            let src = NonNull::new_unchecked(bytes.as_ptr() as *mut u8);
-            memory::memcpy(dst, src, bytes.len());
+            let dst = self.data.as_ptr().add(self.len) as *mut T;
+            std::ptr::write(dst, item);
         }
-        self.len = new_len;
+        self.len += additional;
+    }
+
+    /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
+    #[inline]
+    pub fn extend_zeros(&mut self, additional: usize) {
+        self.resize(self.len + additional, 0);
     }
+}
 
-    /// Extends the buffer by `len` with all bytes equal to `0u8`, incrementing its capacity if needed.
-    pub fn extend(&mut self, len: usize) {
-        let remaining_capacity = self.capacity - self.len;
-        if len > remaining_capacity {
-            self.reserve(self.len + len);
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    /// Extends [`MutableBuffer`] from an [`Iterator`]. If the iterator is an [`ExactSizeIterator`],
+    /// use [`extend_from_exact_iter`] as it is faster.
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+// This implementation is required for two reasons:
+// 1. In stable rust, there is no trait specialization and therefore
+//    we can't specialize `extend_from_iter` for `ExactSizeIterator` like `Vec` does.
+// 2. Extend for `ExactSizeIterator` is at least 2x faster (different scaling).
+// Unfortunately, there is no "FromExactSizeIterator", only `FromIterator`,
+// which means that `collect` is slow and we should avoid it to build a Buffer. This should
+impl MutableBuffer {
+    /// This method is private because the public interface is [`extend`]
+    /// It is here because it is similar to `extend_from_exact_iter`.
+    fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        &mut self,
+        mut iterator: I,
+    ) {
+        let size = std::mem::size_of::<T>();
+
+        let (lower, _) = iterator.size_hint();
+
+        self.reserve(lower * size);
+        let mut dst = unsafe { self.data.as_ptr().add(self.len) as *mut T };
+
+        while let Some(item) = iterator.next() {
+            if self.len >= self.capacity() {
+                let (lower, _) = iterator.size_hint();
+                let additional = (lower + 1) * size;
+                self.reserve_at_least(self.len + additional);

Review comment:
       `reserve_at_least` may be called many times, any chance to calculating the total additional before calling to `reserve_at_least`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org