You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 21:01:26 UTC

[arrow-rs] branch master updated: Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 688dd4c69 Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)
688dd4c69 is described below

commit 688dd4c69b8f38d5d14a7bd71a8345fdcc6654e7
Author: Jörn Horstmann <gi...@jhorstmann.net>
AuthorDate: Thu Apr 7 23:01:21 2022 +0200

    Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)
    
    * Decouple buffer deallocation from ffi and allow zero-copy buffer creation from rust vectors or strings
    
    * Move allocation owner to alloc module
    
    * Rename and comment Deallocation variants
    
    * Fix doc link
    
    * Explicitly assert that Buffer is UnwindSafe
    
    * fix: doc comment
    
    Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
 arrow/src/alloc/mod.rs        | 32 +++++++++++++++++++++
 arrow/src/array/data.rs       | 53 ++++++++++++++++++++++++++++++++--
 arrow/src/buffer/immutable.rs | 66 +++++++++++++++++++++++++++++++++++++------
 arrow/src/buffer/mutable.rs   |  5 ++--
 arrow/src/bytes.rs            | 42 ++++++++-------------------
 arrow/src/ffi.rs              |  3 +-
 6 files changed, 156 insertions(+), 45 deletions(-)

diff --git a/arrow/src/alloc/mod.rs b/arrow/src/alloc/mod.rs
index 88ab81877..418bc95fd 100644
--- a/arrow/src/alloc/mod.rs
+++ b/arrow/src/alloc/mod.rs
@@ -19,8 +19,11 @@
 //! regions, cache and allocation alignments.
 
 use std::alloc::{handle_alloc_error, Layout};
+use std::fmt::{Debug, Formatter};
 use std::mem::size_of;
+use std::panic::RefUnwindSafe;
 use std::ptr::NonNull;
+use std::sync::Arc;
 
 mod alignment;
 mod types;
@@ -121,3 +124,32 @@ pub unsafe fn reallocate<T: NativeType>(
         handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT))
     })
 }
+
+/// The owner of an allocation.
+/// The trait implementation is responsible for dropping the allocations once no more references exist.
+pub trait Allocation: RefUnwindSafe {}
+
+impl<T: RefUnwindSafe> Allocation for T {}
+
+/// Mode of deallocating memory regions
+pub(crate) enum Deallocation {
+    /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
+    /// See [allocate_aligned] and [free_aligned].
+    Arrow(usize),
+    /// An allocation from an external source like the FFI interface or a Rust Vec.
+    /// Deallocation will happen
+    Custom(Arc<dyn Allocation>),
+}
+
+impl Debug for Deallocation {
+    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+        match self {
+            Deallocation::Arrow(capacity) => {
+                write!(f, "Deallocation::Arrow {{ capacity: {} }}", capacity)
+            }
+            Deallocation::Custom(_) => {
+                write!(f, "Deallocation::Custom {{ capacity: unknown }}")
+            }
+        }
+    }
+}
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 551bf536c..8622c0a2c 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -1459,10 +1459,11 @@ impl ArrayDataBuilder {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use std::ptr::NonNull;
 
     use crate::array::{
-        Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, StringArray,
-        StructBuilder, UInt64Array,
+        make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array,
+        StringArray, StructBuilder, UInt64Array,
     };
     use crate::buffer::Buffer;
     use crate::datatypes::Field;
@@ -2594,4 +2595,52 @@ mod tests {
 
         assert_eq!(&struct_array_slice, &cloned);
     }
+
+    #[test]
+    fn test_string_data_from_foreign() {
+        let mut strings = "foobarfoobar".to_owned();
+        let mut offsets = vec![0_i32, 0, 3, 6, 12];
+        let mut bitmap = vec![0b1110_u8];
+
+        let strings_buffer = unsafe {
+            Buffer::from_custom_allocation(
+                NonNull::new_unchecked(strings.as_mut_ptr()),
+                strings.len(),
+                Arc::new(strings),
+            )
+        };
+        let offsets_buffer = unsafe {
+            Buffer::from_custom_allocation(
+                NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8),
+                offsets.len() * std::mem::size_of::<i32>(),
+                Arc::new(offsets),
+            )
+        };
+        let null_buffer = unsafe {
+            Buffer::from_custom_allocation(
+                NonNull::new_unchecked(bitmap.as_mut_ptr()),
+                bitmap.len(),
+                Arc::new(bitmap),
+            )
+        };
+
+        let data = ArrayData::try_new(
+            DataType::Utf8,
+            4,
+            None,
+            Some(null_buffer),
+            0,
+            vec![offsets_buffer, strings_buffer],
+            vec![],
+        )
+        .unwrap();
+
+        let array = make_array(data);
+        let array = array.as_any().downcast_ref::<StringArray>().unwrap();
+
+        let expected =
+            StringArray::from(vec![None, Some("foo"), Some("bar"), Some("foobar")]);
+
+        assert_eq!(array, &expected);
+    }
 }
diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs
index b918c0d44..c34ea101b 100644
--- a/arrow/src/buffer/immutable.rs
+++ b/arrow/src/buffer/immutable.rs
@@ -21,12 +21,10 @@ use std::ptr::NonNull;
 use std::sync::Arc;
 use std::{convert::AsRef, usize};
 
+use crate::alloc::{Allocation, Deallocation};
+use crate::ffi::FFI_ArrowArray;
 use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
-use crate::{
-    bytes::{Bytes, Deallocation},
-    datatypes::ArrowNativeType,
-    ffi,
-};
+use crate::{bytes::Bytes, datatypes::ArrowNativeType};
 
 use super::ops::bitwise_unary_op_helper;
 use super::MutableBuffer;
@@ -76,7 +74,7 @@ impl Buffer {
     /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
     pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
         assert!(len <= capacity);
-        Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity))
+        Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
     }
 
     /// Creates a buffer from an existing memory region (must already be byte-aligned), this
@@ -86,18 +84,41 @@ impl Buffer {
     ///
     /// * `ptr` - Pointer to raw parts
     /// * `len` - Length of raw parts in **bytes**
-    /// * `data` - An [ffi::FFI_ArrowArray] with the data
+    /// * `data` - An [crate::ffi::FFI_ArrowArray] with the data
     ///
     /// # Safety
     ///
     /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
     /// bytes and that the foreign deallocator frees the region.
+    #[deprecated(
+        note = "use from_custom_allocation instead which makes it clearer that the allocation is in fact owned"
+    )]
     pub unsafe fn from_unowned(
         ptr: NonNull<u8>,
         len: usize,
-        data: Arc<ffi::FFI_ArrowArray>,
+        data: Arc<FFI_ArrowArray>,
+    ) -> Self {
+        Self::from_custom_allocation(ptr, len, data)
+    }
+
+    /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
+    /// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero.
+    ///
+    /// # Arguments
+    ///
+    /// * `ptr` - Pointer to raw parts
+    /// * `len` - Length of raw parts in **bytes**
+    /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data
+    ///
+    /// # Safety
+    ///
+    /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` bytes
+    pub unsafe fn from_custom_allocation(
+        ptr: NonNull<u8>,
+        len: usize,
+        owner: Arc<dyn Allocation>,
     ) -> Self {
-        Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data))
+        Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner))
     }
 
     /// Auxiliary method to create a new Buffer
@@ -321,6 +342,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {
 
 #[cfg(test)]
 mod tests {
+    use std::panic::{RefUnwindSafe, UnwindSafe};
     use std::thread;
 
     use super::*;
@@ -533,4 +555,30 @@ mod tests {
             Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9)
         );
     }
+
+    #[test]
+    fn test_unwind_safe() {
+        fn assert_unwind_safe<T: RefUnwindSafe + UnwindSafe>() {}
+        assert_unwind_safe::<Buffer>()
+    }
+
+    #[test]
+    fn test_from_foreign_vec() {
+        let mut vector = vec![1_i32, 2, 3, 4, 5];
+        let buffer = unsafe {
+            Buffer::from_custom_allocation(
+                NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8),
+                vector.len() * std::mem::size_of::<i32>(),
+                Arc::new(vector),
+            )
+        };
+
+        let slice = unsafe { buffer.typed_data::<i32>() };
+        assert_eq!(slice, &[1, 2, 3, 4, 5]);
+
+        let buffer = buffer.slice(std::mem::size_of::<i32>());
+
+        let slice = unsafe { buffer.typed_data::<i32>() };
+        assert_eq!(slice, &[2, 3, 4, 5]);
+    }
 }
diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs
index bbe7bb405..709973b44 100644
--- a/arrow/src/buffer/mutable.rs
+++ b/arrow/src/buffer/mutable.rs
@@ -16,9 +16,10 @@
 // under the License.
 
 use super::Buffer;
+use crate::alloc::Deallocation;
 use crate::{
     alloc,
-    bytes::{Bytes, Deallocation},
+    bytes::Bytes,
     datatypes::{ArrowNativeType, ToByteSlice},
     util::bit_util,
 };
@@ -266,7 +267,7 @@ impl MutableBuffer {
     #[inline]
     pub(super) fn into_buffer(self) -> Buffer {
         let bytes = unsafe {
-            Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
+            Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
         };
         std::mem::forget(self);
         Buffer::from_bytes(bytes)
diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs
index bc92b9057..7b57552e6 100644
--- a/arrow/src/bytes.rs
+++ b/arrow/src/bytes.rs
@@ -21,31 +21,10 @@
 
 use core::slice;
 use std::ptr::NonNull;
-use std::sync::Arc;
 use std::{fmt::Debug, fmt::Formatter};
 
-use crate::{alloc, ffi};
-
-/// Mode of deallocating memory regions
-pub enum Deallocation {
-    /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment
-    Native(usize),
-    /// Foreign interface, via a callback
-    Foreign(Arc<ffi::FFI_ArrowArray>),
-}
-
-impl Debug for Deallocation {
-    fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
-        match self {
-            Deallocation::Native(capacity) => {
-                write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
-            }
-            Deallocation::Foreign(_) => {
-                write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
-            }
-        }
-    }
-}
+use crate::alloc;
+use crate::alloc::Deallocation;
 
 /// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
 /// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
@@ -53,8 +32,9 @@ impl Debug for Deallocation {
 ///
 /// In the most common case, this buffer is allocated using [`allocate_aligned`](crate::alloc::allocate_aligned)
 /// and deallocated accordingly [`free_aligned`](crate::alloc::free_aligned).
-/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the
-/// foreign deallocator to deallocate the region when it is no longer needed.
+///
+/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
+/// custom deallocator to deallocate the region when it is no longer needed.
 pub struct Bytes {
     /// The raw pointer to be beginning of the region
     ptr: NonNull<u8>,
@@ -80,7 +60,7 @@ impl Bytes {
     /// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
     /// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
     #[inline]
-    pub unsafe fn new(
+    pub(crate) unsafe fn new(
         ptr: std::ptr::NonNull<u8>,
         len: usize,
         deallocation: Deallocation,
@@ -113,10 +93,10 @@ impl Bytes {
 
     pub fn capacity(&self) -> usize {
         match self.deallocation {
-            Deallocation::Native(capacity) => capacity,
+            Deallocation::Arrow(capacity) => capacity,
             // we cannot determine this in general,
             // and thus we state that this is externally-owned memory
-            Deallocation::Foreign(_) => 0,
+            Deallocation::Custom(_) => 0,
         }
     }
 }
@@ -125,11 +105,11 @@ impl Drop for Bytes {
     #[inline]
     fn drop(&mut self) {
         match &self.deallocation {
-            Deallocation::Native(capacity) => {
+            Deallocation::Arrow(capacity) => {
                 unsafe { alloc::free_aligned::<u8>(self.ptr, *capacity) };
             }
-            // foreign interface knows how to deallocate itself.
-            Deallocation::Foreign(_) => (),
+            // The automatic drop implementation will free the memory once the reference count reaches zero
+            Deallocation::Custom(_allocation) => (),
         }
     }
 }
diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs
index 72f69d846..a6ccfa02a 100644
--- a/arrow/src/ffi.rs
+++ b/arrow/src/ffi.rs
@@ -538,7 +538,8 @@ unsafe fn create_buffer(
     assert!(index < array.n_buffers as usize);
     let ptr = *buffers.add(index);
 
-    NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner))
+    NonNull::new(ptr as *mut u8)
+        .map(|ptr| Buffer::from_custom_allocation(ptr, len, owner))
 }
 
 fn create_child(