You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@arrow.apache.org by GitBox <gi...@apache.org> on 2021/01/22 12:35:56 UTC

[GitHub] [arrow] alamb commented on a change in pull request #9235: ARROW-11291: [Rust] Add extend to MutableBuffer (-20% for arithmetic, -97% for length)

alamb commented on a change in pull request #9235:
URL: https://github.com/apache/arrow/pull/9235#discussion_r562596060



##########
File path: rust/arrow/src/buffer.rs
##########
@@ -1003,6 +1156,28 @@ impl PartialEq for MutableBuffer {
 unsafe impl Sync for MutableBuffer {}
 unsafe impl Send for MutableBuffer {}
 
+struct SetLenOnDrop<'a> {

Review comment:
       this construct is fascinating, but I will take your word that it was necessary for speedup

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -963,11 +970,157 @@ impl MutableBuffer {
 
     /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
     #[inline]
-    pub fn extend(&mut self, additional: usize) {
+    pub fn extend_zeros(&mut self, additional: usize) {
         self.resize(self.len + additional, 0);
     }
 }
 
+/// # Safety
+/// `ptr` must be allocated for `old_capacity`.

Review comment:
       ```suggestion
   /// Ensures `ptr` points at at least `new_capacity` bytes, employing
   /// a doubling strategy if reallocate is needed.
   ///
   /// # Safety
   /// `ptr` must be allocated for `old_capacity`.
   ```

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -963,11 +970,157 @@ impl MutableBuffer {
 
     /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
     #[inline]
-    pub fn extend(&mut self, additional: usize) {
+    pub fn extend_zeros(&mut self, additional: usize) {
         self.resize(self.len + additional, 0);
     }
 }
 
+/// # Safety
+/// `ptr` must be allocated for `old_capacity`.
+#[inline]
+unsafe fn reallocate(
+    ptr: NonNull<u8>,
+    old_capacity: usize,
+    new_capacity: usize,
+) -> (NonNull<u8>, usize) {
+    let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
+    let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
+    let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
+    (ptr, new_capacity)
+}
+
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+impl MutableBuffer {
+    #[inline]
+    fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        &mut self,
+        mut iterator: I,
+    ) {
+        let size = std::mem::size_of::<T>();
+        let (lower, _) = iterator.size_hint();
+        let additional = lower * size;
+        self.reserve(additional);
+
+        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
+        let mut len = SetLenOnDrop::new(&mut self.len);
+        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
+        let capacity = self.capacity;
+
+        while len.local_len + size <= capacity {
+            if let Some(item) = iterator.next() {
+                unsafe {
+                    std::ptr::write(dst, item);
+                    dst = dst.add(1);
+                }
+                len.local_len += size;
+            } else {
+                break;
+            }
+        }
+        drop(len);
+
+        iterator.for_each(|item| self.push(item));
+    }
+}
+
+impl Buffer {
+    /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
+    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::Buffer;
+    /// let v = vec![1u32];
+    /// let iter = v.iter().map(|x| x * 2);
+    /// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
+    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
+    /// ```
+    /// # Safety
+    /// This method assumes that the iterator's size is correct and is undefined behavior
+    /// to use it on an iterator that reports an incorrect length.
+    // This implementation is required for two reasons:
+    // 1. there is no trait `TrustedLen` in stable rust and therefore
+    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
+    // 2. `from_trusted_len_iter` is faster.
+    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        iterator: I,
+    ) -> Self {
+        let (_, upper) = iterator.size_hint();
+        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
+        let len = upper * std::mem::size_of::<T>();
+
+        let mut buffer = MutableBuffer::new(len);
+
+        let mut dst = buffer.data.as_ptr() as *mut T;
+        for item in iterator {
+            // note how there is no reserve here (compared with `extend_from_iter`)
+            std::ptr::write(dst, item);
+            dst = dst.add(1);
+        }
+        buffer.len = len;

Review comment:
       ```suggestion
           assert_eq!(dst.offset_from(buffer.data.as_ptr() as *mut T) as usize, upper,
                   "Trusted iterator length was not accurately reported");
           buffer.len = len;
   ```

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -963,11 +970,157 @@ impl MutableBuffer {
 
     /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
     #[inline]
-    pub fn extend(&mut self, additional: usize) {
+    pub fn extend_zeros(&mut self, additional: usize) {
         self.resize(self.len + additional, 0);
     }
 }
 
+/// # Safety
+/// `ptr` must be allocated for `old_capacity`.
+#[inline]
+unsafe fn reallocate(
+    ptr: NonNull<u8>,
+    old_capacity: usize,
+    new_capacity: usize,
+) -> (NonNull<u8>, usize) {
+    let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
+    let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
+    let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
+    (ptr, new_capacity)
+}
+
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+impl MutableBuffer {
+    #[inline]
+    fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        &mut self,
+        mut iterator: I,
+    ) {
+        let size = std::mem::size_of::<T>();
+        let (lower, _) = iterator.size_hint();
+        let additional = lower * size;
+        self.reserve(additional);
+
+        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
+        let mut len = SetLenOnDrop::new(&mut self.len);
+        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
+        let capacity = self.capacity;
+
+        while len.local_len + size <= capacity {
+            if let Some(item) = iterator.next() {
+                unsafe {
+                    std::ptr::write(dst, item);
+                    dst = dst.add(1);
+                }
+                len.local_len += size;
+            } else {
+                break;
+            }
+        }
+        drop(len);
+
+        iterator.for_each(|item| self.push(item));
+    }
+}
+
+impl Buffer {
+    /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
+    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::Buffer;
+    /// let v = vec![1u32];
+    /// let iter = v.iter().map(|x| x * 2);
+    /// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
+    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
+    /// ```
+    /// # Safety
+    /// This method assumes that the iterator's size is correct and is undefined behavior
+    /// to use it on an iterator that reports an incorrect length.
+    // This implementation is required for two reasons:
+    // 1. there is no trait `TrustedLen` in stable rust and therefore
+    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
+    // 2. `from_trusted_len_iter` is faster.
+    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        iterator: I,
+    ) -> Self {
+        let (_, upper) = iterator.size_hint();
+        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
+        let len = upper * std::mem::size_of::<T>();
+
+        let mut buffer = MutableBuffer::new(len);
+
+        let mut dst = buffer.data.as_ptr() as *mut T;
+        for item in iterator {
+            // note how there is no reserve here (compared with `extend_from_iter`)
+            std::ptr::write(dst, item);
+            dst = dst.add(1);
+        }
+        buffer.len = len;

Review comment:
       The same comment for `try_from_trusted_len_iter`

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -963,11 +970,157 @@ impl MutableBuffer {
 
     /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
     #[inline]
-    pub fn extend(&mut self, additional: usize) {
+    pub fn extend_zeros(&mut self, additional: usize) {
         self.resize(self.len + additional, 0);
     }
 }
 
+/// # Safety
+/// `ptr` must be allocated for `old_capacity`.
+#[inline]
+unsafe fn reallocate(
+    ptr: NonNull<u8>,
+    old_capacity: usize,
+    new_capacity: usize,
+) -> (NonNull<u8>, usize) {
+    let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
+    let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
+    let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
+    (ptr, new_capacity)
+}
+
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+impl MutableBuffer {
+    #[inline]
+    fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        &mut self,
+        mut iterator: I,
+    ) {
+        let size = std::mem::size_of::<T>();
+        let (lower, _) = iterator.size_hint();
+        let additional = lower * size;
+        self.reserve(additional);
+
+        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
+        let mut len = SetLenOnDrop::new(&mut self.len);
+        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
+        let capacity = self.capacity;
+
+        while len.local_len + size <= capacity {
+            if let Some(item) = iterator.next() {
+                unsafe {
+                    std::ptr::write(dst, item);
+                    dst = dst.add(1);
+                }
+                len.local_len += size;
+            } else {
+                break;
+            }
+        }
+        drop(len);
+
+        iterator.for_each(|item| self.push(item));

Review comment:
       ```suggestion
           // Handle the case where size_hint was lower than actual number of items
           iterator.for_each(|item| self.push(item));
   ```

##########
File path: rust/arrow/src/buffer.rs
##########
@@ -963,11 +970,157 @@ impl MutableBuffer {
 
     /// Extends the buffer by `additional` bytes equal to `0u8`, incrementing its capacity if needed.
     #[inline]
-    pub fn extend(&mut self, additional: usize) {
+    pub fn extend_zeros(&mut self, additional: usize) {
         self.resize(self.len + additional, 0);
     }
 }
 
+/// # Safety
+/// `ptr` must be allocated for `old_capacity`.
+#[inline]
+unsafe fn reallocate(
+    ptr: NonNull<u8>,
+    old_capacity: usize,
+    new_capacity: usize,
+) -> (NonNull<u8>, usize) {
+    let new_capacity = bit_util::round_upto_multiple_of_64(new_capacity);
+    let new_capacity = std::cmp::max(new_capacity, old_capacity * 2);
+    let ptr = memory::reallocate(ptr, old_capacity, new_capacity);
+    (ptr, new_capacity)
+}
+
+impl<A: ArrowNativeType> Extend<A> for MutableBuffer {
+    #[inline]
+    fn extend<T: IntoIterator<Item = A>>(&mut self, iter: T) {
+        let iterator = iter.into_iter();
+        self.extend_from_iter(iterator)
+    }
+}
+
+impl MutableBuffer {
+    #[inline]
+    fn extend_from_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        &mut self,
+        mut iterator: I,
+    ) {
+        let size = std::mem::size_of::<T>();
+        let (lower, _) = iterator.size_hint();
+        let additional = lower * size;
+        self.reserve(additional);
+
+        // this is necessary because of https://github.com/rust-lang/rust/issues/32155
+        let mut len = SetLenOnDrop::new(&mut self.len);
+        let mut dst = unsafe { self.data.as_ptr().add(len.local_len) as *mut T };
+        let capacity = self.capacity;
+
+        while len.local_len + size <= capacity {
+            if let Some(item) = iterator.next() {
+                unsafe {
+                    std::ptr::write(dst, item);
+                    dst = dst.add(1);
+                }
+                len.local_len += size;
+            } else {
+                break;
+            }
+        }
+        drop(len);
+
+        iterator.for_each(|item| self.push(item));
+    }
+}
+
+impl Buffer {
+    /// Creates a [`Buffer`] from an [`Iterator`] with a trusted (upper) length.
+    /// Prefer this to `collect` whenever possible, as it is faster ~60% faster.
+    /// # Example
+    /// ```
+    /// # use arrow::buffer::Buffer;
+    /// let v = vec![1u32];
+    /// let iter = v.iter().map(|x| x * 2);
+    /// let buffer = unsafe { Buffer::from_trusted_len_iter(iter) };
+    /// assert_eq!(buffer.len(), 4) // u32 has 4 bytes
+    /// ```
+    /// # Safety
+    /// This method assumes that the iterator's size is correct and is undefined behavior
+    /// to use it on an iterator that reports an incorrect length.
+    // This implementation is required for two reasons:
+    // 1. there is no trait `TrustedLen` in stable rust and therefore
+    //    we can't specialize `extend` for `TrustedLen` like `Vec` does.
+    // 2. `from_trusted_len_iter` is faster.
+    pub unsafe fn from_trusted_len_iter<T: ArrowNativeType, I: Iterator<Item = T>>(
+        iterator: I,
+    ) -> Self {
+        let (_, upper) = iterator.size_hint();
+        let upper = upper.expect("from_trusted_len_iter requires an upper limit");
+        let len = upper * std::mem::size_of::<T>();
+
+        let mut buffer = MutableBuffer::new(len);
+
+        let mut dst = buffer.data.as_ptr() as *mut T;
+        for item in iterator {
+            // note how there is no reserve here (compared with `extend_from_iter`)
+            std::ptr::write(dst, item);
+            dst = dst.add(1);
+        }
+        buffer.len = len;

Review comment:
       I think it would be valuable to assert here that the number of bytes written was *actually* equal to `upper` and thus panic if the invariant wasn't held (of course after memory had been overwritten and undefined behavior had begin). It might make bugs easier to catch.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org