You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/07 21:01:26 UTC
[arrow-rs] branch master updated: Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 688dd4c69 Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)
688dd4c69 is described below
commit 688dd4c69b8f38d5d14a7bd71a8345fdcc6654e7
Author: Jörn Horstmann <gi...@jhorstmann.net>
AuthorDate: Thu Apr 7 23:01:21 2022 +0200
Decouple buffer deallocation from ffi and allow creating buffers from rust vec (#1494)
* Decouple buffer deallocation from ffi and allow zero-copy buffer creation from rust vectors or strings
* Move allocation owner to alloc module
* Rename and comment Deallocation variants
* Fix doc link
* Explicitly assert that Buffer is UnwindSafe
* fix: doc comment
Co-authored-by: Andrew Lamb <an...@nerdnetworks.org>
---
arrow/src/alloc/mod.rs | 32 +++++++++++++++++++++
arrow/src/array/data.rs | 53 ++++++++++++++++++++++++++++++++--
arrow/src/buffer/immutable.rs | 66 +++++++++++++++++++++++++++++++++++++------
arrow/src/buffer/mutable.rs | 5 ++--
arrow/src/bytes.rs | 42 ++++++++-------------------
arrow/src/ffi.rs | 3 +-
6 files changed, 156 insertions(+), 45 deletions(-)
diff --git a/arrow/src/alloc/mod.rs b/arrow/src/alloc/mod.rs
index 88ab81877..418bc95fd 100644
--- a/arrow/src/alloc/mod.rs
+++ b/arrow/src/alloc/mod.rs
@@ -19,8 +19,11 @@
//! regions, cache and allocation alignments.
use std::alloc::{handle_alloc_error, Layout};
+use std::fmt::{Debug, Formatter};
use std::mem::size_of;
+use std::panic::RefUnwindSafe;
use std::ptr::NonNull;
+use std::sync::Arc;
mod alignment;
mod types;
@@ -121,3 +124,32 @@ pub unsafe fn reallocate<T: NativeType>(
handle_alloc_error(Layout::from_size_align_unchecked(new_size, ALIGNMENT))
})
}
+
+/// The owner of an allocation.
+/// The trait implementation is responsible for dropping the allocations once no more references exist.
+pub trait Allocation: RefUnwindSafe {}
+
+impl<T: RefUnwindSafe> Allocation for T {}
+
+/// Mode of deallocating memory regions
+pub(crate) enum Deallocation {
+ /// An allocation of the given capacity that needs to be deallocated using arrows's cache aligned allocator.
+ /// See [allocate_aligned] and [free_aligned].
+ Arrow(usize),
+ /// An allocation from an external source like the FFI interface or a Rust Vec.
+ /// Deallocation will happen
+ Custom(Arc<dyn Allocation>),
+}
+
+impl Debug for Deallocation {
+ fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
+ match self {
+ Deallocation::Arrow(capacity) => {
+ write!(f, "Deallocation::Arrow {{ capacity: {} }}", capacity)
+ }
+ Deallocation::Custom(_) => {
+ write!(f, "Deallocation::Custom {{ capacity: unknown }}")
+ }
+ }
+ }
+}
diff --git a/arrow/src/array/data.rs b/arrow/src/array/data.rs
index 551bf536c..8622c0a2c 100644
--- a/arrow/src/array/data.rs
+++ b/arrow/src/array/data.rs
@@ -1459,10 +1459,11 @@ impl ArrayDataBuilder {
#[cfg(test)]
mod tests {
use super::*;
+ use std::ptr::NonNull;
use crate::array::{
- Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array, StringArray,
- StructBuilder, UInt64Array,
+ make_array, Array, BooleanBuilder, Int32Array, Int32Builder, Int64Array,
+ StringArray, StructBuilder, UInt64Array,
};
use crate::buffer::Buffer;
use crate::datatypes::Field;
@@ -2594,4 +2595,52 @@ mod tests {
assert_eq!(&struct_array_slice, &cloned);
}
+
+ #[test]
+ fn test_string_data_from_foreign() {
+ let mut strings = "foobarfoobar".to_owned();
+ let mut offsets = vec![0_i32, 0, 3, 6, 12];
+ let mut bitmap = vec![0b1110_u8];
+
+ let strings_buffer = unsafe {
+ Buffer::from_custom_allocation(
+ NonNull::new_unchecked(strings.as_mut_ptr()),
+ strings.len(),
+ Arc::new(strings),
+ )
+ };
+ let offsets_buffer = unsafe {
+ Buffer::from_custom_allocation(
+ NonNull::new_unchecked(offsets.as_mut_ptr() as *mut u8),
+ offsets.len() * std::mem::size_of::<i32>(),
+ Arc::new(offsets),
+ )
+ };
+ let null_buffer = unsafe {
+ Buffer::from_custom_allocation(
+ NonNull::new_unchecked(bitmap.as_mut_ptr()),
+ bitmap.len(),
+ Arc::new(bitmap),
+ )
+ };
+
+ let data = ArrayData::try_new(
+ DataType::Utf8,
+ 4,
+ None,
+ Some(null_buffer),
+ 0,
+ vec![offsets_buffer, strings_buffer],
+ vec![],
+ )
+ .unwrap();
+
+ let array = make_array(data);
+ let array = array.as_any().downcast_ref::<StringArray>().unwrap();
+
+ let expected =
+ StringArray::from(vec![None, Some("foo"), Some("bar"), Some("foobar")]);
+
+ assert_eq!(array, &expected);
+ }
}
diff --git a/arrow/src/buffer/immutable.rs b/arrow/src/buffer/immutable.rs
index b918c0d44..c34ea101b 100644
--- a/arrow/src/buffer/immutable.rs
+++ b/arrow/src/buffer/immutable.rs
@@ -21,12 +21,10 @@ use std::ptr::NonNull;
use std::sync::Arc;
use std::{convert::AsRef, usize};
+use crate::alloc::{Allocation, Deallocation};
+use crate::ffi::FFI_ArrowArray;
use crate::util::bit_chunk_iterator::{BitChunks, UnalignedBitChunk};
-use crate::{
- bytes::{Bytes, Deallocation},
- datatypes::ArrowNativeType,
- ffi,
-};
+use crate::{bytes::Bytes, datatypes::ArrowNativeType};
use super::ops::bitwise_unary_op_helper;
use super::MutableBuffer;
@@ -76,7 +74,7 @@ impl Buffer {
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
pub unsafe fn from_raw_parts(ptr: NonNull<u8>, len: usize, capacity: usize) -> Self {
assert!(len <= capacity);
- Buffer::build_with_arguments(ptr, len, Deallocation::Native(capacity))
+ Buffer::build_with_arguments(ptr, len, Deallocation::Arrow(capacity))
}
/// Creates a buffer from an existing memory region (must already be byte-aligned), this
@@ -86,18 +84,41 @@ impl Buffer {
///
/// * `ptr` - Pointer to raw parts
/// * `len` - Length of raw parts in **bytes**
- /// * `data` - An [ffi::FFI_ArrowArray] with the data
+ /// * `data` - An [crate::ffi::FFI_ArrowArray] with the data
///
/// # Safety
///
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes and that the foreign deallocator frees the region.
+ #[deprecated(
+ note = "use from_custom_allocation instead which makes it clearer that the allocation is in fact owned"
+ )]
pub unsafe fn from_unowned(
ptr: NonNull<u8>,
len: usize,
- data: Arc<ffi::FFI_ArrowArray>,
+ data: Arc<FFI_ArrowArray>,
+ ) -> Self {
+ Self::from_custom_allocation(ptr, len, data)
+ }
+
+ /// Creates a buffer from an existing memory region. Ownership of the memory is tracked via reference counting
+ /// and the memory will be freed using the `drop` method of [crate::alloc::Allocation] when the reference count reaches zero.
+ ///
+ /// # Arguments
+ ///
+ /// * `ptr` - Pointer to raw parts
+ /// * `len` - Length of raw parts in **bytes**
+ /// * `owner` - A [crate::alloc::Allocation] which is responsible for freeing that data
+ ///
+ /// # Safety
+ ///
+ /// This function is unsafe as there is no guarantee that the given pointer is valid for `len` bytes
+ pub unsafe fn from_custom_allocation(
+ ptr: NonNull<u8>,
+ len: usize,
+ owner: Arc<dyn Allocation>,
) -> Self {
- Buffer::build_with_arguments(ptr, len, Deallocation::Foreign(data))
+ Buffer::build_with_arguments(ptr, len, Deallocation::Custom(owner))
}
/// Auxiliary method to create a new Buffer
@@ -321,6 +342,7 @@ impl<T: ArrowNativeType> FromIterator<T> for Buffer {
#[cfg(test)]
mod tests {
+ use std::panic::{RefUnwindSafe, UnwindSafe};
use std::thread;
use super::*;
@@ -533,4 +555,30 @@ mod tests {
Buffer::from(&[0b01101101, 0b10101010]).count_set_bits_offset(7, 9)
);
}
+
+ #[test]
+ fn test_unwind_safe() {
+ fn assert_unwind_safe<T: RefUnwindSafe + UnwindSafe>() {}
+ assert_unwind_safe::<Buffer>()
+ }
+
+ #[test]
+ fn test_from_foreign_vec() {
+ let mut vector = vec![1_i32, 2, 3, 4, 5];
+ let buffer = unsafe {
+ Buffer::from_custom_allocation(
+ NonNull::new_unchecked(vector.as_mut_ptr() as *mut u8),
+ vector.len() * std::mem::size_of::<i32>(),
+ Arc::new(vector),
+ )
+ };
+
+ let slice = unsafe { buffer.typed_data::<i32>() };
+ assert_eq!(slice, &[1, 2, 3, 4, 5]);
+
+ let buffer = buffer.slice(std::mem::size_of::<i32>());
+
+ let slice = unsafe { buffer.typed_data::<i32>() };
+ assert_eq!(slice, &[2, 3, 4, 5]);
+ }
}
diff --git a/arrow/src/buffer/mutable.rs b/arrow/src/buffer/mutable.rs
index bbe7bb405..709973b44 100644
--- a/arrow/src/buffer/mutable.rs
+++ b/arrow/src/buffer/mutable.rs
@@ -16,9 +16,10 @@
// under the License.
use super::Buffer;
+use crate::alloc::Deallocation;
use crate::{
alloc,
- bytes::{Bytes, Deallocation},
+ bytes::Bytes,
datatypes::{ArrowNativeType, ToByteSlice},
util::bit_util,
};
@@ -266,7 +267,7 @@ impl MutableBuffer {
#[inline]
pub(super) fn into_buffer(self) -> Buffer {
let bytes = unsafe {
- Bytes::new(self.data, self.len, Deallocation::Native(self.capacity))
+ Bytes::new(self.data, self.len, Deallocation::Arrow(self.capacity))
};
std::mem::forget(self);
Buffer::from_bytes(bytes)
diff --git a/arrow/src/bytes.rs b/arrow/src/bytes.rs
index bc92b9057..7b57552e6 100644
--- a/arrow/src/bytes.rs
+++ b/arrow/src/bytes.rs
@@ -21,31 +21,10 @@
use core::slice;
use std::ptr::NonNull;
-use std::sync::Arc;
use std::{fmt::Debug, fmt::Formatter};
-use crate::{alloc, ffi};
-
-/// Mode of deallocating memory regions
-pub enum Deallocation {
- /// Native deallocation, using Rust deallocator with Arrow-specific memory alignment
- Native(usize),
- /// Foreign interface, via a callback
- Foreign(Arc<ffi::FFI_ArrowArray>),
-}
-
-impl Debug for Deallocation {
- fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
- match self {
- Deallocation::Native(capacity) => {
- write!(f, "Deallocation::Native {{ capacity: {} }}", capacity)
- }
- Deallocation::Foreign(_) => {
- write!(f, "Deallocation::Foreign {{ capacity: unknown }}")
- }
- }
- }
-}
+use crate::alloc;
+use crate::alloc::Deallocation;
/// A continuous, fixed-size, immutable memory region that knows how to de-allocate itself.
/// This structs' API is inspired by the `bytes::Bytes`, but it is not limited to using rust's
@@ -53,8 +32,9 @@ impl Debug for Deallocation {
///
/// In the most common case, this buffer is allocated using [`allocate_aligned`](crate::alloc::allocate_aligned)
/// and deallocated accordingly [`free_aligned`](crate::alloc::free_aligned).
-/// When the region is allocated by an foreign allocator, [Deallocation::Foreign], this calls the
-/// foreign deallocator to deallocate the region when it is no longer needed.
+///
+/// When the region is allocated by a different allocator, [Deallocation::Custom], this calls the
+/// custom deallocator to deallocate the region when it is no longer needed.
pub struct Bytes {
/// The raw pointer to be beginning of the region
ptr: NonNull<u8>,
@@ -80,7 +60,7 @@ impl Bytes {
/// This function is unsafe as there is no guarantee that the given pointer is valid for `len`
/// bytes. If the `ptr` and `capacity` come from a `Buffer`, then this is guaranteed.
#[inline]
- pub unsafe fn new(
+ pub(crate) unsafe fn new(
ptr: std::ptr::NonNull<u8>,
len: usize,
deallocation: Deallocation,
@@ -113,10 +93,10 @@ impl Bytes {
pub fn capacity(&self) -> usize {
match self.deallocation {
- Deallocation::Native(capacity) => capacity,
+ Deallocation::Arrow(capacity) => capacity,
// we cannot determine this in general,
// and thus we state that this is externally-owned memory
- Deallocation::Foreign(_) => 0,
+ Deallocation::Custom(_) => 0,
}
}
}
@@ -125,11 +105,11 @@ impl Drop for Bytes {
#[inline]
fn drop(&mut self) {
match &self.deallocation {
- Deallocation::Native(capacity) => {
+ Deallocation::Arrow(capacity) => {
unsafe { alloc::free_aligned::<u8>(self.ptr, *capacity) };
}
- // foreign interface knows how to deallocate itself.
- Deallocation::Foreign(_) => (),
+ // The automatic drop implementation will free the memory once the reference count reaches zero
+ Deallocation::Custom(_allocation) => (),
}
}
}
diff --git a/arrow/src/ffi.rs b/arrow/src/ffi.rs
index 72f69d846..a6ccfa02a 100644
--- a/arrow/src/ffi.rs
+++ b/arrow/src/ffi.rs
@@ -538,7 +538,8 @@ unsafe fn create_buffer(
assert!(index < array.n_buffers as usize);
let ptr = *buffers.add(index);
- NonNull::new(ptr as *mut u8).map(|ptr| Buffer::from_unowned(ptr, len, owner))
+ NonNull::new(ptr as *mut u8)
+ .map(|ptr| Buffer::from_custom_allocation(ptr, len, owner))
}
fn create_child(