You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/01/24 13:22:39 UTC
[arrow] branch master updated: ARROW-4212: [C++][Python] CudaBuffer
view of arbitrary device memory object
This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new f0c464e ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory object
f0c464e is described below
commit f0c464ea70e8d4bf5a5ed03e95a97327a2057af4
Author: Pearu Peterson <pe...@gmail.com>
AuthorDate: Thu Jan 24 14:22:29 2019 +0100
ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory object
This PR implements the following new features:
1. `CudaContext::GetDeviceAddress` method
2. `pyarrow.cuda.Context.get_device_address` method
3. `pyarrow.cuda.Context.buffer_from_object` method to create a CudaBuffer view of arbitrary device memory object that implements `__cuda_array_interface__` attribute or that is `CudaBuffer` or `CudaHostBuffer` or numba `MemoryPointer` object.
4. Tests for `buffer_from_object` method
and the following improvements:
1. `pyarrow.cuda.Context.foreign_buffer` ensures that the used device memory address is valid for the given context.
Author: Pearu Peterson <pe...@gmail.com>
Closes #3439 from pearu/arrow-4212 and squashes the following commits:
ba905739 <Pearu Peterson> Add 2-D and 3-D tests. buffer_from_object requires contiguous memory input.
355fc035 <Pearu Peterson> Apply more feedback fixes.
22134da4 <Pearu Peterson> Refactor. Add tests for negative strides and reduce test cases.
dbd1cec6 <Pearu Peterson> Moved some buffer_from_objects tests under test_cuda.
562c1805 <Pearu Peterson> Change GetDeviceAddress API to use uint8_t, add C++ tests for it, update docs and apply feedback.
2726a91e <Pearu Peterson> Add buffer_from_object and get_device_address methods.
df8656f8 <Pearu Peterson> Add GetDeviceAddress method to CudaContext.
---
cpp/src/arrow/gpu/cuda-test.cc | 14 ++++
cpp/src/arrow/gpu/cuda_context.cc | 7 ++
cpp/src/arrow/gpu/cuda_context.h | 14 ++++
python/pyarrow/_cuda.pyx | 99 +++++++++++++++++++++++--
python/pyarrow/includes/libarrow_cuda.pxd | 1 +
python/pyarrow/tests/test_cuda.py | 50 ++++++++++---
python/pyarrow/tests/test_cuda_numba_interop.py | 79 ++++++++++++++++++++
python/pyarrow/util.py | 45 +++++++++++
8 files changed, 292 insertions(+), 17 deletions(-)
diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 5d85a81..628d0f2 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -343,5 +343,19 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
CompareBatch(*batch, *cpu_batch);
}
+class TestCudaContext : public TestCudaBufferBase {
+ public:
+ void SetUp() { TestCudaBufferBase::SetUp(); }
+};
+
+TEST_F(TestCudaContext, GetDeviceAddress) {
+ const int64_t kSize = 100;
+ std::shared_ptr<CudaBuffer> buffer;
+ uint8_t* devptr = NULL;
+ ASSERT_OK(context_->Allocate(kSize, &buffer));
+ ASSERT_OK(context_->GetDeviceAddress(buffer.get()->mutable_data(), &devptr));
+ ASSERT_EQ(buffer.get()->mutable_data(), devptr);
+}
+
} // namespace cuda
} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc
index 9e95040..2f3f1bd 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -343,5 +343,12 @@ void* CudaContext::handle() const { return impl_->context_handle(); }
int CudaContext::device_number() const { return impl_->device().device_num; }
+Status CudaContext::GetDeviceAddress(uint8_t* addr, uint8_t** devaddr) {
+ ContextSaver set_temporary(reinterpret_cast<CUcontext>(handle()));
+ CU_RETURN_NOT_OK(cuPointerGetAttribute(devaddr, CU_POINTER_ATTRIBUTE_DEVICE_POINTER,
+ reinterpret_cast<CUdeviceptr>(addr)));
+ return Status::OK();
+}
+
} // namespace cuda
} // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 8f1b34b..938a815 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -119,6 +119,20 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
/// \brief Return device number
int device_number() const;
+ /// \brief Return the device address that is reachable from kernels
+ /// running in the context
+ /// \param[in] addr device or host memory address
+ /// \param[out] devaddr the device address
+ /// \return Status
+ ///
+ /// The device address is defined as a memory address accessible by
+ /// device. While it is often a device memory address, it can be
+ /// also a host memory address, for instance, when the memory is
+ /// allocated as host memory (using cudaMallocHost or cudaHostAlloc)
+ /// or as managed memory (using cudaMallocManaged) or the host
+ /// memory is page-locked (using cudaHostRegister).
+ Status GetDeviceAddress(uint8_t* addr, uint8_t** devaddr);
+
private:
CudaContext();
diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx
index c2d95a6..eac3dae 100644
--- a/python/pyarrow/_cuda.pyx
+++ b/python/pyarrow/_cuda.pyx
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
+
from pyarrow.compat import tobytes
from pyarrow.lib cimport *
from pyarrow.includes.libarrow_cuda cimport *
-from pyarrow.lib import py_buffer, allocate_buffer, as_buffer
+from pyarrow.lib import py_buffer, allocate_buffer, as_buffer, ArrowTypeError
+from pyarrow.util import get_contiguous_span
cimport cpython as cp
@@ -137,10 +139,40 @@ cdef class Context:
@property
def bytes_allocated(self):
- """ Return the number of allocated bytes.
+ """Return the number of allocated bytes.
"""
return self.context.get().bytes_allocated()
+ def get_device_address(self, address):
+ """Return the device address that is reachable from kernels running in
+ the context
+
+ Parameters
+ ----------
+ address : int
+ Specify memory address value
+
+ Returns
+ -------
+ device_address : int
+ Device address accessible from device context
+
+ Notes
+ -----
+ The device address is defined as a memory address accessible
+ by device. While it is often a device memory address but it
+ can be also a host memory address, for instance, when the
+ memory is allocated as host memory (using cudaMallocHost or
+ cudaHostAlloc) or as managed memory (using cudaMallocManaged)
+ or the host memory is page-locked (using cudaHostRegister).
+ """
+ cdef:
+ uintptr_t c_addr = address
+ uint8_t* c_devaddr
+ check_status(self.context.get().GetDeviceAddress(<uint8_t*>c_addr,
+ &c_devaddr))
+ return <uintptr_t>c_devaddr
+
def new_buffer(self, nbytes):
"""Return new device buffer.
@@ -159,26 +191,32 @@ cdef class Context:
return pyarrow_wrap_cudabuffer(cudabuf)
def foreign_buffer(self, address, size):
- """Create device buffer from device address and size as a view.
+ """Create device buffer from address and size as a view.
The caller is responsible for allocating and freeing the
- memory as well as ensuring that the memory belongs to the
- CUDA context that this Context instance holds.
+ memory. When `address==size==0` then a new zero-sized buffer
+ is returned.
Parameters
----------
address : int
- Specify the starting address of the buffer.
+ Specify the starting address of the buffer. The address can
+ refer to both device or host memory but it must be
+ accessible from device after mapping it with
+ `get_device_address` method.
size : int
Specify the size of device buffer in bytes.
Returns
-------
cbuf : CudaBuffer
- Device buffer as a view of device memory.
+ Device buffer as a view of device reachable memory.
+
"""
+ if not address and size == 0:
+ return self.new_buffer(0)
cdef:
- intptr_t c_addr = address
+ uintptr_t c_addr = self.get_device_address(address)
int64_t c_size = size
shared_ptr[CCudaBuffer] cudabuf
check_status(self.context.get().View(<uint8_t*>c_addr,
@@ -246,6 +284,49 @@ cdef class Context:
result.copy_from_device(buf, position=0, nbytes=size)
return result
+ def buffer_from_object(self, obj):
+ """Create device buffer view of arbitrary object that references
+ device accessible memory.
+
+ When the object contains a non-contiguous view of device
+ accessbile memory then the returned device buffer will contain
+ contiguous view of the memory, that is, including the
+ intermediate data that is otherwise invisible to the input
+ object.
+
+ Parameters
+ ----------
+ obj : {object, Buffer, HostBuffer, CudaBuffer, ...}
+ Specify an object that holds (device or host) address that
+ can be accessed from device. This includes objects with
+ types defined in pyarrow.cuda as well as arbitrary objects
+ that implement the CUDA array interface as defined by numba.
+
+ Returns
+ -------
+ cbuf : CudaBuffer
+ Device buffer as a view of device accessible memory.
+
+ """
+ if isinstance(obj, HostBuffer):
+ return self.foreign_buffer(obj.address, obj.size)
+ elif isinstance(obj, Buffer):
+ return CudaBuffer.from_buffer(obj)
+ elif isinstance(obj, CudaBuffer):
+ return obj
+ elif hasattr(obj, '__cuda_array_interface__'):
+ desc = obj.__cuda_array_interface__
+ addr = desc['data'][0]
+ if addr is None:
+ return self.new_buffer(0)
+ import numpy as np
+ start, end = get_contiguous_span(
+ desc['shape'], desc.get('strides'),
+ np.dtype(desc['typestr']).itemsize)
+ return self.foreign_buffer(addr + start, end - start)
+ raise ArrowTypeError('cannot create device buffer view from'
+ ' `%s` object' % (type(obj)))
+
cdef class IpcMemHandle:
"""A serializable container for a CUDA IPC handle.
@@ -343,6 +424,8 @@ cdef class CudaBuffer(Buffer):
Device buffer as a view of numba MemoryPointer.
"""
ctx = Context.from_numba(mem.context)
+ if mem.device_pointer.value is None and mem.size==0:
+ return ctx.new_buffer(0)
return ctx.foreign_buffer(mem.device_pointer.value, mem.size)
def to_numba(self):
diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd
index cedc432..ef89d9c 100644
--- a/python/pyarrow/includes/libarrow_cuda.pxd
+++ b/python/pyarrow/includes/libarrow_cuda.pxd
@@ -46,6 +46,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil:
int64_t bytes_allocated() const
const void* handle() const
int device_number() const
+ CStatus GetDeviceAddress(uint8_t* addr, uint8_t** devaddr)
cdef cppclass CCudaIpcMemHandle" arrow::cuda::CudaIpcMemHandle":
@staticmethod
diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py
index 8c86457..4633df1 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -60,7 +60,7 @@ def test_Context():
cuda.Context(cuda.Context.get_num_devices())
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_manage_allocate_free_host(size):
buf = cuda.new_host_buffer(size)
arr = np.frombuffer(buf, dtype=np.uint8)
@@ -102,7 +102,7 @@ def make_random_buffer(size, target='host'):
raise ValueError('invalid target value')
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_device_buffer(size):
# Creating device buffer from host buffer;
arr, buf = make_random_buffer(size)
@@ -230,7 +230,39 @@ def test_context_device_buffer(size):
np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
+def test_context_from_object(size):
+ ctx = global_context
+ arr, cbuf = make_random_buffer(size, target='device')
+ dtype = arr.dtype
+
+ # Creating device buffer from a CUDA host buffer
+ hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
+ np.frombuffer(hbuf, dtype=dtype)[:] = arr
+ cbuf2 = ctx.buffer_from_object(hbuf)
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating device buffer from a device buffer
+ cbuf2 = ctx.buffer_from_object(cbuf2)
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ # Trying to create a device buffer from a Buffer
+ with pytest.raises(pa.ArrowTypeError,
+ match=('buffer is not backed by a CudaBuffer')):
+ ctx.buffer_from_object(pa.py_buffer(b"123"))
+
+ # Trying to create a device buffer from numpy.array
+ with pytest.raises(pa.ArrowTypeError,
+ match=('cannot create device buffer view from'
+ ' `<class \'numpy.ndarray\'>` object')):
+ ctx.buffer_from_object(np.array([1, 2, 3]))
+
+
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_CudaBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
@@ -255,7 +287,7 @@ def test_CudaBuffer(size):
cuda.CudaBuffer()
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_HostBuffer(size):
arr, buf = make_random_buffer(size)
assert arr.tobytes() == buf.to_pybytes()
@@ -281,7 +313,7 @@ def test_HostBuffer(size):
cuda.HostBuffer()
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_to_host(size):
# Create a buffer in host containing range(size)
@@ -306,7 +338,7 @@ def test_copy_from_to_host(size):
np.testing.assert_equal(arr, arr2)
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_to_host(size):
arr, dbuf = make_random_buffer(size, target='device')
@@ -366,7 +398,7 @@ def test_copy_to_host(size):
dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_device(size):
arr, buf = make_random_buffer(size=size, target='device')
lst = arr.tolist()
@@ -410,7 +442,7 @@ def test_copy_from_device(size):
put(position=position, nbytes=nbytes)
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_host(size):
arr, buf = make_random_buffer(size=size, target='host')
lst = arr.tolist()
@@ -617,7 +649,7 @@ def other_process_for_test_IPC(handle_buffer, expected_arr):
@cuda_ipc
@pytest.mark.skipif(sys.version_info[0] == 2, reason="test needs Python 3")
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
def test_IPC(size):
import multiprocessing
ctx = multiprocessing.get_context('spawn')
diff --git a/python/pyarrow/tests/test_cuda_numba_interop.py b/python/pyarrow/tests/test_cuda_numba_interop.py
index 296fe2d..ff1722d 100644
--- a/python/pyarrow/tests/test_cuda_numba_interop.py
+++ b/python/pyarrow/tests/test_cuda_numba_interop.py
@@ -78,6 +78,85 @@ def make_random_buffer(size, target='host', dtype='uint8', ctx=None):
@pytest.mark.parametrize("c", range(len(context_choice_ids)),
ids=context_choice_ids)
@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
+@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+def test_from_object(c, dtype, size):
+ ctx, nb_ctx = context_choices[c]
+ arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
+
+ # Creating device buffer from numba DeviceNDArray:
+ darr = nb_cuda.to_device(arr)
+ cbuf2 = ctx.buffer_from_object(darr)
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ # Creating device buffer from a slice of numba DeviceNDArray:
+ if size >= 8:
+ # 1-D arrays
+ for s in [slice(size//4, None, None),
+ slice(size//4, -(size//4), None)]:
+ cbuf2 = ctx.buffer_from_object(darr[s])
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr[s], arr2)
+
+ # cannot test negative strides due to numba bug, see its issue 3705
+ if 0:
+ rdarr = darr[::-1]
+ cbuf2 = ctx.buffer_from_object(rdarr)
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ with pytest.raises(ValueError,
+ match=('array data is non-contiguous')):
+ ctx.buffer_from_object(darr[::2])
+
+ # a rectangular 2-D array
+ s1 = size//4
+ s2 = size//s1
+ assert s1 * s2 == size
+ cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2))
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ with pytest.raises(ValueError,
+ match=('array data is non-contiguous')):
+ ctx.buffer_from_object(darr.reshape(s1, s2)[:, ::2])
+
+ # a 3-D array
+ s1 = 4
+ s2 = size//8
+ s3 = size//(s1*s2)
+ assert s1 * s2 * s3 == size
+ cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2, s3))
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+ with pytest.raises(ValueError,
+ match=('array data is non-contiguous')):
+ ctx.buffer_from_object(darr.reshape(s1, s2, s3)[::2])
+
+ # Creating device buffer from am object implementing cuda array
+ # interface:
+ class MyObj:
+ def __init__(self, darr):
+ self.darr = darr
+
+ @property
+ def __cuda_array_interface__(self):
+ return self.darr.__cuda_array_interface__
+
+ cbuf2 = ctx.buffer_from_object(MyObj(darr))
+ assert cbuf2.size == cbuf.size
+ arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+ np.testing.assert_equal(arr, arr2)
+
+
+@pytest.mark.parametrize("c", range(len(context_choice_ids)),
+ ids=context_choice_ids)
+@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
def test_numba_memalloc(c, dtype):
ctx, nb_ctx = context_choices[c]
dtype = np.dtype(dtype)
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
index 7cf57d8..6c17f5c 100644
--- a/python/pyarrow/util.py
+++ b/python/pyarrow/util.py
@@ -17,9 +17,11 @@
# Miscellaneous utility code
+import functools
import six
import warnings
+
try:
from textwrap import indent
except ImportError:
@@ -78,3 +80,46 @@ def _stringify_path(path):
return str(path)
raise TypeError("not a path-like object")
+
+
+def product(seq):
+ """
+ Return a product of sequence items.
+ """
+ return functools.reduce(lambda a, b: a*b, seq, 1)
+
+
+def get_contiguous_span(shape, strides, itemsize):
+ """
+ Return a contiguous span of N-D array data.
+
+ Parameters
+ ----------
+ shape : tuple
+ strides : tuple
+ itemsize : int
+ Specify array shape data
+
+ Returns
+ -------
+ start, end : int
+ The span end points.
+ """
+ if not strides:
+ start = 0
+ end = itemsize * product(shape)
+ else:
+ start = 0
+ end = itemsize
+ for i, dim in enumerate(shape):
+ if dim == 0:
+ start = end = 0
+ break
+ stride = strides[i]
+ if stride > 0:
+ end += stride * (dim - 1)
+ elif stride < 0:
+ start += stride * (dim - 1)
+ if end - start != itemsize * product(shape):
+ raise ValueError('array data is non-contiguous')
+ return start, end