You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by ap...@apache.org on 2019/01/24 13:22:39 UTC

[arrow] branch master updated: ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory object

This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new f0c464e  ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory object
f0c464e is described below

commit f0c464ea70e8d4bf5a5ed03e95a97327a2057af4
Author: Pearu Peterson <pe...@gmail.com>
AuthorDate: Thu Jan 24 14:22:29 2019 +0100

    ARROW-4212: [C++][Python] CudaBuffer view of arbitrary device memory object
    
    This PR implements the following new features:
    1. `CudaContext::GetDeviceAddress` method
    2. `pyarrow.cuda.Context.get_device_address` method
    3. `pyarrow.cuda.Context.buffer_from_object` method to create a CudaBuffer view of arbitrary device memory object that implements `__cuda_array_interface__` attribute or that is `CudaBuffer` or `CudaHostBuffer` or numba `MemoryPointer` object.
    4. Tests for `buffer_from_object` method
    
    and the following improvements:
    1. `pyarrow.cuda.Context.foreign_buffer` ensures that the used device memory address is valid for the given context.
    
    Author: Pearu Peterson <pe...@gmail.com>
    
    Closes #3439 from pearu/arrow-4212 and squashes the following commits:
    
    ba905739 <Pearu Peterson> Add 2-D and 3-D tests. buffer_from_object requires contiguous memory input.
    355fc035 <Pearu Peterson> Apply more feedback fixes.
    22134da4 <Pearu Peterson> Refactor. Add tests for negative strides and reduce test cases.
    dbd1cec6 <Pearu Peterson> Moved some buffer_from_objects tests under test_cuda.
    562c1805 <Pearu Peterson> Change GetDeviceAddress API to use uint8_t, add C++ tests for it, update docs and apply feedback.
    2726a91e <Pearu Peterson> Add buffer_from_object and get_device_address methods.
    df8656f8 <Pearu Peterson> Add GetDeviceAddress method to CudaContext.
---
 cpp/src/arrow/gpu/cuda-test.cc                  | 14 ++++
 cpp/src/arrow/gpu/cuda_context.cc               |  7 ++
 cpp/src/arrow/gpu/cuda_context.h                | 14 ++++
 python/pyarrow/_cuda.pyx                        | 99 +++++++++++++++++++++++--
 python/pyarrow/includes/libarrow_cuda.pxd       |  1 +
 python/pyarrow/tests/test_cuda.py               | 50 ++++++++++---
 python/pyarrow/tests/test_cuda_numba_interop.py | 79 ++++++++++++++++++++
 python/pyarrow/util.py                          | 45 +++++++++++
 8 files changed, 292 insertions(+), 17 deletions(-)

diff --git a/cpp/src/arrow/gpu/cuda-test.cc b/cpp/src/arrow/gpu/cuda-test.cc
index 5d85a81..628d0f2 100644
--- a/cpp/src/arrow/gpu/cuda-test.cc
+++ b/cpp/src/arrow/gpu/cuda-test.cc
@@ -343,5 +343,19 @@ TEST_F(TestCudaArrowIpc, BasicWriteRead) {
   CompareBatch(*batch, *cpu_batch);
 }
 
+class TestCudaContext : public TestCudaBufferBase {
+ public:
+  void SetUp() { TestCudaBufferBase::SetUp(); }
+};
+
+TEST_F(TestCudaContext, GetDeviceAddress) {
+  const int64_t kSize = 100;
+  std::shared_ptr<CudaBuffer> buffer;
+  uint8_t* devptr = NULL;
+  ASSERT_OK(context_->Allocate(kSize, &buffer));
+  ASSERT_OK(context_->GetDeviceAddress(buffer.get()->mutable_data(), &devptr));
+  ASSERT_EQ(buffer.get()->mutable_data(), devptr);
+}
+
 }  // namespace cuda
 }  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_context.cc b/cpp/src/arrow/gpu/cuda_context.cc
index 9e95040..2f3f1bd 100644
--- a/cpp/src/arrow/gpu/cuda_context.cc
+++ b/cpp/src/arrow/gpu/cuda_context.cc
@@ -343,5 +343,12 @@ void* CudaContext::handle() const { return impl_->context_handle(); }
 
 int CudaContext::device_number() const { return impl_->device().device_num; }
 
+Status CudaContext::GetDeviceAddress(uint8_t* addr, uint8_t** devaddr) {
+  ContextSaver set_temporary(reinterpret_cast<CUcontext>(handle()));
+  CU_RETURN_NOT_OK(cuPointerGetAttribute(devaddr, CU_POINTER_ATTRIBUTE_DEVICE_POINTER,
+                                         reinterpret_cast<CUdeviceptr>(addr)));
+  return Status::OK();
+}
+
 }  // namespace cuda
 }  // namespace arrow
diff --git a/cpp/src/arrow/gpu/cuda_context.h b/cpp/src/arrow/gpu/cuda_context.h
index 8f1b34b..938a815 100644
--- a/cpp/src/arrow/gpu/cuda_context.h
+++ b/cpp/src/arrow/gpu/cuda_context.h
@@ -119,6 +119,20 @@ class ARROW_EXPORT CudaContext : public std::enable_shared_from_this<CudaContext
   /// \brief Return device number
   int device_number() const;
 
+  /// \brief Return the device address that is reachable from kernels
+  /// running in the context
+  /// \param[in] addr device or host memory address
+  /// \param[out] devaddr the device address
+  /// \return Status
+  ///
+  /// The device address is defined as a memory address accessible by
+  /// device. While it is often a device memory address, it can be
+  /// also a host memory address, for instance, when the memory is
+  /// allocated as host memory (using cudaMallocHost or cudaHostAlloc)
+  /// or as managed memory (using cudaMallocManaged) or the host
+  /// memory is page-locked (using cudaHostRegister).
+  Status GetDeviceAddress(uint8_t* addr, uint8_t** devaddr);
+
  private:
   CudaContext();
 
diff --git a/python/pyarrow/_cuda.pyx b/python/pyarrow/_cuda.pyx
index c2d95a6..eac3dae 100644
--- a/python/pyarrow/_cuda.pyx
+++ b/python/pyarrow/_cuda.pyx
@@ -15,10 +15,12 @@
 # specific language governing permissions and limitations
 # under the License.
 
+
 from pyarrow.compat import tobytes
 from pyarrow.lib cimport *
 from pyarrow.includes.libarrow_cuda cimport *
-from pyarrow.lib import py_buffer, allocate_buffer, as_buffer
+from pyarrow.lib import py_buffer, allocate_buffer, as_buffer, ArrowTypeError
+from pyarrow.util import get_contiguous_span
 cimport cpython as cp
 
 
@@ -137,10 +139,40 @@ cdef class Context:
 
     @property
     def bytes_allocated(self):
-        """ Return the number of allocated bytes.
+        """Return the number of allocated bytes.
         """
         return self.context.get().bytes_allocated()
 
+    def get_device_address(self, address):
+        """Return the device address that is reachable from kernels running in
+        the context
+
+        Parameters
+        ----------
+        address : int
+          Specify memory address value
+
+        Returns
+        -------
+        device_address : int
+          Device address accessible from device context
+
+        Notes
+        -----
+        The device address is defined as a memory address accessible
+        by device. While it is often a device memory address but it
+        can be also a host memory address, for instance, when the
+        memory is allocated as host memory (using cudaMallocHost or
+        cudaHostAlloc) or as managed memory (using cudaMallocManaged)
+        or the host memory is page-locked (using cudaHostRegister).
+        """
+        cdef:
+            uintptr_t c_addr = address
+            uint8_t* c_devaddr
+        check_status(self.context.get().GetDeviceAddress(<uint8_t*>c_addr,
+                                                         &c_devaddr))
+        return <uintptr_t>c_devaddr
+
     def new_buffer(self, nbytes):
         """Return new device buffer.
 
@@ -159,26 +191,32 @@ cdef class Context:
         return pyarrow_wrap_cudabuffer(cudabuf)
 
     def foreign_buffer(self, address, size):
-        """Create device buffer from device address and size as a view.
+        """Create device buffer from address and size as a view.
 
         The caller is responsible for allocating and freeing the
-        memory as well as ensuring that the memory belongs to the
-        CUDA context that this Context instance holds.
+        memory. When `address==size==0` then a new zero-sized buffer
+        is returned.
 
         Parameters
         ----------
         address : int
-          Specify the starting address of the buffer.
+          Specify the starting address of the buffer. The address can
+          refer to both device or host memory but it must be
+          accessible from device after mapping it with
+          `get_device_address` method.
         size : int
           Specify the size of device buffer in bytes.
 
         Returns
         -------
         cbuf : CudaBuffer
-          Device buffer as a view of device memory.
+          Device buffer as a view of device reachable memory.
+
         """
+        if not address and size == 0:
+            return self.new_buffer(0)
         cdef:
-            intptr_t c_addr = address
+            uintptr_t c_addr = self.get_device_address(address)
             int64_t c_size = size
             shared_ptr[CCudaBuffer] cudabuf
         check_status(self.context.get().View(<uint8_t*>c_addr,
@@ -246,6 +284,49 @@ cdef class Context:
             result.copy_from_device(buf, position=0, nbytes=size)
         return result
 
+    def buffer_from_object(self, obj):
+        """Create device buffer view of arbitrary object that references
+        device accessible memory.
+
+        When the object contains a non-contiguous view of device
+        accessbile memory then the returned device buffer will contain
+        contiguous view of the memory, that is, including the
+        intermediate data that is otherwise invisible to the input
+        object.
+
+        Parameters
+        ----------
+        obj : {object, Buffer, HostBuffer, CudaBuffer, ...}
+          Specify an object that holds (device or host) address that
+          can be accessed from device. This includes objects with
+          types defined in pyarrow.cuda as well as arbitrary objects
+          that implement the CUDA array interface as defined by numba.
+
+        Returns
+        -------
+        cbuf : CudaBuffer
+          Device buffer as a view of device accessible memory.
+
+        """
+        if isinstance(obj, HostBuffer):
+            return self.foreign_buffer(obj.address, obj.size)
+        elif isinstance(obj, Buffer):
+            return CudaBuffer.from_buffer(obj)
+        elif isinstance(obj, CudaBuffer):
+            return obj
+        elif hasattr(obj, '__cuda_array_interface__'):
+            desc = obj.__cuda_array_interface__
+            addr = desc['data'][0]
+            if addr is None:
+                return self.new_buffer(0)
+            import numpy as np
+            start, end = get_contiguous_span(
+                desc['shape'], desc.get('strides'),
+                np.dtype(desc['typestr']).itemsize)
+            return self.foreign_buffer(addr + start, end - start)
+        raise ArrowTypeError('cannot create device buffer view from'
+                             ' `%s` object' % (type(obj)))
+
 
 cdef class IpcMemHandle:
     """A serializable container for a CUDA IPC handle.
@@ -343,6 +424,8 @@ cdef class CudaBuffer(Buffer):
           Device buffer as a view of numba MemoryPointer.
         """
         ctx = Context.from_numba(mem.context)
+        if mem.device_pointer.value is None and mem.size==0:
+            return ctx.new_buffer(0)
         return ctx.foreign_buffer(mem.device_pointer.value, mem.size)
 
     def to_numba(self):
diff --git a/python/pyarrow/includes/libarrow_cuda.pxd b/python/pyarrow/includes/libarrow_cuda.pxd
index cedc432..ef89d9c 100644
--- a/python/pyarrow/includes/libarrow_cuda.pxd
+++ b/python/pyarrow/includes/libarrow_cuda.pxd
@@ -46,6 +46,7 @@ cdef extern from "arrow/gpu/cuda_api.h" namespace "arrow::cuda" nogil:
         int64_t bytes_allocated() const
         const void* handle() const
         int device_number() const
+        CStatus GetDeviceAddress(uint8_t* addr, uint8_t** devaddr)
 
     cdef cppclass CCudaIpcMemHandle" arrow::cuda::CudaIpcMemHandle":
         @staticmethod
diff --git a/python/pyarrow/tests/test_cuda.py b/python/pyarrow/tests/test_cuda.py
index 8c86457..4633df1 100644
--- a/python/pyarrow/tests/test_cuda.py
+++ b/python/pyarrow/tests/test_cuda.py
@@ -60,7 +60,7 @@ def test_Context():
         cuda.Context(cuda.Context.get_num_devices())
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_manage_allocate_free_host(size):
     buf = cuda.new_host_buffer(size)
     arr = np.frombuffer(buf, dtype=np.uint8)
@@ -102,7 +102,7 @@ def make_random_buffer(size, target='host'):
     raise ValueError('invalid target value')
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_context_device_buffer(size):
     # Creating device buffer from host buffer;
     arr, buf = make_random_buffer(size)
@@ -230,7 +230,39 @@ def test_context_device_buffer(size):
     np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
+def test_context_from_object(size):
+    ctx = global_context
+    arr, cbuf = make_random_buffer(size, target='device')
+    dtype = arr.dtype
+
+    # Creating device buffer from a CUDA host buffer
+    hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
+    np.frombuffer(hbuf, dtype=dtype)[:] = arr
+    cbuf2 = ctx.buffer_from_object(hbuf)
+    assert cbuf2.size == cbuf.size
+    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+    np.testing.assert_equal(arr, arr2)
+
+    # Creating device buffer from a device buffer
+    cbuf2 = ctx.buffer_from_object(cbuf2)
+    assert cbuf2.size == cbuf.size
+    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+    np.testing.assert_equal(arr, arr2)
+
+    # Trying to create a device buffer from a Buffer
+    with pytest.raises(pa.ArrowTypeError,
+                       match=('buffer is not backed by a CudaBuffer')):
+        ctx.buffer_from_object(pa.py_buffer(b"123"))
+
+    # Trying to create a device buffer from numpy.array
+    with pytest.raises(pa.ArrowTypeError,
+                       match=('cannot create device buffer view from'
+                              ' `<class \'numpy.ndarray\'>` object')):
+        ctx.buffer_from_object(np.array([1, 2, 3]))
+
+
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_CudaBuffer(size):
     arr, buf = make_random_buffer(size)
     assert arr.tobytes() == buf.to_pybytes()
@@ -255,7 +287,7 @@ def test_CudaBuffer(size):
         cuda.CudaBuffer()
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_HostBuffer(size):
     arr, buf = make_random_buffer(size)
     assert arr.tobytes() == buf.to_pybytes()
@@ -281,7 +313,7 @@ def test_HostBuffer(size):
         cuda.HostBuffer()
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_copy_from_to_host(size):
 
     # Create a buffer in host containing range(size)
@@ -306,7 +338,7 @@ def test_copy_from_to_host(size):
     np.testing.assert_equal(arr, arr2)
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_copy_to_host(size):
     arr, dbuf = make_random_buffer(size, target='device')
 
@@ -366,7 +398,7 @@ def test_copy_to_host(size):
             dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_copy_from_device(size):
     arr, buf = make_random_buffer(size=size, target='device')
     lst = arr.tolist()
@@ -410,7 +442,7 @@ def test_copy_from_device(size):
             put(position=position, nbytes=nbytes)
 
 
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_copy_from_host(size):
     arr, buf = make_random_buffer(size=size, target='host')
     lst = arr.tolist()
@@ -617,7 +649,7 @@ def other_process_for_test_IPC(handle_buffer, expected_arr):
 
 @cuda_ipc
 @pytest.mark.skipif(sys.version_info[0] == 2, reason="test needs Python 3")
-@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+@pytest.mark.parametrize("size", [0, 1, 1000])
 def test_IPC(size):
     import multiprocessing
     ctx = multiprocessing.get_context('spawn')
diff --git a/python/pyarrow/tests/test_cuda_numba_interop.py b/python/pyarrow/tests/test_cuda_numba_interop.py
index 296fe2d..ff1722d 100644
--- a/python/pyarrow/tests/test_cuda_numba_interop.py
+++ b/python/pyarrow/tests/test_cuda_numba_interop.py
@@ -78,6 +78,85 @@ def make_random_buffer(size, target='host', dtype='uint8', ctx=None):
 @pytest.mark.parametrize("c", range(len(context_choice_ids)),
                          ids=context_choice_ids)
 @pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
+@pytest.mark.parametrize("size", [0, 1, 8, 1000])
+def test_from_object(c, dtype, size):
+    ctx, nb_ctx = context_choices[c]
+    arr, cbuf = make_random_buffer(size, target='device', dtype=dtype, ctx=ctx)
+
+    # Creating device buffer from numba DeviceNDArray:
+    darr = nb_cuda.to_device(arr)
+    cbuf2 = ctx.buffer_from_object(darr)
+    assert cbuf2.size == cbuf.size
+    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+    np.testing.assert_equal(arr, arr2)
+
+    # Creating device buffer from a slice of numba DeviceNDArray:
+    if size >= 8:
+        # 1-D arrays
+        for s in [slice(size//4, None, None),
+                  slice(size//4, -(size//4), None)]:
+            cbuf2 = ctx.buffer_from_object(darr[s])
+            arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+            np.testing.assert_equal(arr[s], arr2)
+
+        # cannot test negative strides due to numba bug, see its issue 3705
+        if 0:
+            rdarr = darr[::-1]
+            cbuf2 = ctx.buffer_from_object(rdarr)
+            assert cbuf2.size == cbuf.size
+            arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+            np.testing.assert_equal(arr, arr2)
+
+        with pytest.raises(ValueError,
+                           match=('array data is non-contiguous')):
+            ctx.buffer_from_object(darr[::2])
+
+        # a rectangular 2-D array
+        s1 = size//4
+        s2 = size//s1
+        assert s1 * s2 == size
+        cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2))
+        assert cbuf2.size == cbuf.size
+        arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+        np.testing.assert_equal(arr, arr2)
+
+        with pytest.raises(ValueError,
+                           match=('array data is non-contiguous')):
+            ctx.buffer_from_object(darr.reshape(s1, s2)[:, ::2])
+
+        # a 3-D array
+        s1 = 4
+        s2 = size//8
+        s3 = size//(s1*s2)
+        assert s1 * s2 * s3 == size
+        cbuf2 = ctx.buffer_from_object(darr.reshape(s1, s2, s3))
+        assert cbuf2.size == cbuf.size
+        arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+        np.testing.assert_equal(arr, arr2)
+
+        with pytest.raises(ValueError,
+                           match=('array data is non-contiguous')):
+            ctx.buffer_from_object(darr.reshape(s1, s2, s3)[::2])
+
+    # Creating device buffer from am object implementing cuda array
+    # interface:
+    class MyObj:
+        def __init__(self, darr):
+            self.darr = darr
+
+        @property
+        def __cuda_array_interface__(self):
+            return self.darr.__cuda_array_interface__
+
+    cbuf2 = ctx.buffer_from_object(MyObj(darr))
+    assert cbuf2.size == cbuf.size
+    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
+    np.testing.assert_equal(arr, arr2)
+
+
+@pytest.mark.parametrize("c", range(len(context_choice_ids)),
+                         ids=context_choice_ids)
+@pytest.mark.parametrize("dtype", dtypes, ids=dtypes)
 def test_numba_memalloc(c, dtype):
     ctx, nb_ctx = context_choices[c]
     dtype = np.dtype(dtype)
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
index 7cf57d8..6c17f5c 100644
--- a/python/pyarrow/util.py
+++ b/python/pyarrow/util.py
@@ -17,9 +17,11 @@
 
 # Miscellaneous utility code
 
+import functools
 import six
 import warnings
 
+
 try:
     from textwrap import indent
 except ImportError:
@@ -78,3 +80,46 @@ def _stringify_path(path):
             return str(path)
 
     raise TypeError("not a path-like object")
+
+
+def product(seq):
+    """
+    Return a product of sequence items.
+    """
+    return functools.reduce(lambda a, b: a*b, seq, 1)
+
+
+def get_contiguous_span(shape, strides, itemsize):
+    """
+    Return a contiguous span of N-D array data.
+
+    Parameters
+    ----------
+    shape : tuple
+    strides : tuple
+    itemsize : int
+      Specify array shape data
+
+    Returns
+    -------
+    start, end : int
+      The span end points.
+    """
+    if not strides:
+        start = 0
+        end = itemsize * product(shape)
+    else:
+        start = 0
+        end = itemsize
+        for i, dim in enumerate(shape):
+            if dim == 0:
+                start = end = 0
+                break
+            stride = strides[i]
+            if stride > 0:
+                end += stride * (dim - 1)
+            elif stride < 0:
+                start += stride * (dim - 1)
+        if end - start != itemsize * product(shape):
+            raise ValueError('array data is non-contiguous')
+    return start, end