You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/11 22:49:23 UTC

[arrow] branch master updated: ARROW-4498: [Plasma] Fix building Plasma with CUDA enabled

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 18f9e69  ARROW-4498: [Plasma] Fix building Plasma with CUDA enabled
18f9e69 is described below

commit 18f9e692f3bf588e4b062702e79884ca26882dbf
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Feb 11 16:49:05 2019 -0600

    ARROW-4498: [Plasma] Fix building Plasma with CUDA enabled
    
    This fixes build and tests for me.
    
    Author: Antoine Pitrou <an...@python.org>
    
    Closes #3608 from pitrou/ARROW-4498-plasma-cuda-fix and squashes the following commits:
    
    9090449a <Antoine Pitrou> ARROW-4498:  Fix building Plasma with CUDA enabled
---
 cpp/src/arrow/CMakeLists.txt               |   1 +
 cpp/src/arrow/util/{memory.h => memory.cc} |  11 +--
 cpp/src/arrow/util/memory.h                |  50 +-------------
 cpp/src/plasma/store.cc                    | 105 +++++++++++++++--------------
 cpp/src/plasma/store.h                     |   7 +-
 5 files changed, 66 insertions(+), 108 deletions(-)

diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 0815a89..aa09968 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -119,6 +119,7 @@ set(ARROW_SRCS
     util/io-util.cc
     util/logging.cc
     util/key_value_metadata.cc
+    util/memory.cc
     util/task-group.cc
     util/thread-pool.cc
     util/trie.cc
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.cc
similarity index 89%
copy from cpp/src/arrow/util/memory.h
copy to cpp/src/arrow/util/memory.cc
index bc1a526..82ce787 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.cc
@@ -15,18 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#ifndef ARROW_UTIL_MEMORY_H
-#define ARROW_UTIL_MEMORY_H
-
-#include <thread>
 #include <vector>
 
+#include "arrow/util/memory.h"
 #include "arrow/util/thread-pool.h"
 
 namespace arrow {
 namespace internal {
 
-uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
+inline uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
   uintptr_t value = reinterpret_cast<uintptr_t>(address);
   return reinterpret_cast<uint8_t*>(value & bits);
 }
@@ -35,8 +32,6 @@ uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
 // See also: https://sourceforge.net/p/mingw-w64/bugs/767/
 void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
 
-// A helper function for doing memcpy with multiple threads. This is required
-// to saturate the memory bandwidth of modern cpus.
 void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
                       uintptr_t block_size, int num_threads) {
   // XXX This function is really using `num_threads + 1` threads.
@@ -76,5 +71,3 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
 
 }  // namespace internal
 }  // namespace arrow
-
-#endif  // ARROW_UTIL_MEMORY_H
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index bc1a526..62641e3 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -18,61 +18,15 @@
 #ifndef ARROW_UTIL_MEMORY_H
 #define ARROW_UTIL_MEMORY_H
 
-#include <thread>
-#include <vector>
-
-#include "arrow/util/thread-pool.h"
+#include <cstdint>
 
 namespace arrow {
 namespace internal {
 
-uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
-  uintptr_t value = reinterpret_cast<uintptr_t>(address);
-  return reinterpret_cast<uint8_t*>(value & bits);
-}
-
-// This function is just for avoiding MinGW-w64 32bit crash.
-// See also: https://sourceforge.net/p/mingw-w64/bugs/767/
-void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
-
 // A helper function for doing memcpy with multiple threads. This is required
 // to saturate the memory bandwidth of modern cpus.
 void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
-                      uintptr_t block_size, int num_threads) {
-  // XXX This function is really using `num_threads + 1` threads.
-  auto pool = GetCpuThreadPool();
-
-  uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
-  uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
-  int64_t num_blocks = (right - left) / block_size;
-
-  // Update right address
-  right = right - (num_blocks % num_threads) * block_size;
-
-  // Now we divide these blocks between available threads. The remainder is
-  // handled separately.
-  size_t chunk_size = (right - left) / num_threads;
-  int64_t prefix = left - src;
-  int64_t suffix = src + nbytes - right;
-  // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
-  // We have chunk_size = k * block_size, therefore the data layout is
-  // | prefix | num_threads * chunk_size | suffix |.
-  // Each thread gets a "chunk" of k blocks.
-
-  // Start all parallel memcpy tasks and handle leftovers while threads run.
-  std::vector<std::future<void*>> futures;
-
-  for (int i = 0; i < num_threads; i++) {
-    futures.emplace_back(pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size,
-                                      left + i * chunk_size, chunk_size));
-  }
-  memcpy(dst, src, prefix);
-  memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
-
-  for (auto& fut : futures) {
-    fut.get();
-  }
-}
+                      uintptr_t block_size, int num_threads);
 
 }  // namespace internal
 }  // namespace arrow
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 05495b7..82f648c 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -148,17 +148,10 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt
 }
 
 // Allocate memory
-uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd,
-                                     int64_t* map_size, ptrdiff_t* offset) {
+uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size,
+                                     ptrdiff_t* offset) {
   // Try to evict objects until there is enough space.
   uint8_t* pointer = nullptr;
-#ifdef PLASMA_CUDA
-  std::shared_ptr<CudaBuffer> gpu_handle;
-  std::shared_ptr<CudaContext> context_;
-  if (device_num != 0) {
-    DCHECK_OK(manager_->GetContext(device_num - 1, &context_));
-  }
-#endif
   while (true) {
     // Allocate space for the new object. We use memalign instead of malloc
     // in order to align the allocated region to a 64-byte boundary. This is not
@@ -167,61 +160,82 @@ uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd,
     // plasma_client.cc). Note that even though this pointer is 64-byte aligned,
     // it is not guaranteed that the corresponding pointer in the client will be
     // 64-byte aligned, but in practice it often will be.
-    if (device_num == 0) {
-      pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
-      if (!pointer) {
-        // Tell the eviction policy how much space we need to create this object.
-        std::vector<ObjectID> objects_to_evict;
-        bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
-        EvictObjects(objects_to_evict);
-        // Return an error to the client if not enough space could be freed to
-        // create the object.
-        if (!success) {
-          return nullptr;
-        }
-      } else {
-        break;
-      }
-    } else {
-#ifdef PLASMA_CUDA
-      DCHECK_OK(context_->Allocate(data_size + metadata_size, &gpu_handle));
+    pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
+    if (pointer) {
       break;
-#endif
+    }
+    // Tell the eviction policy how much space we need to create this object.
+    std::vector<ObjectID> objects_to_evict;
+    bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
+    EvictObjects(objects_to_evict);
+    // Return an error to the client if not enough space could be freed to
+    // create the object.
+    if (!success) {
+      return nullptr;
     }
   }
-  if (device_num == 0) {
-    GetMallocMapinfo(pointer, fd, map_size, offset);
-    ARROW_CHECK(*fd != -1);
-  }
+  GetMallocMapinfo(pointer, fd, map_size, offset);
+  ARROW_CHECK(*fd != -1);
   return pointer;
 }
 
+#ifdef PLASMA_CUDA
+Status PlasmaStore::AllocateCudaMemory(
+    int device_num, int64_t size, uint8_t** out_pointer,
+    std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle) {
+  std::shared_ptr<CudaBuffer> cuda_buffer;
+  std::shared_ptr<CudaContext> context_;
+  DCHECK_NE(device_num, 0);
+  RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context_));
+  RETURN_NOT_OK(context_->Allocate(static_cast<int64_t>(size), &cuda_buffer));
+  *out_pointer = cuda_buffer->mutable_data();
+  // The IPC handle will keep the buffer memory alive
+  return cuda_buffer->ExportForIpc(out_ipc_handle);
+}
+#endif
+
 // Create a new object buffer in the hash table.
 PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size,
                                       int64_t metadata_size, int device_num,
                                       Client* client, PlasmaObject* result) {
   ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+
   auto entry = GetObjectTableEntry(&store_info_, object_id);
   if (entry != nullptr) {
     // There is already an object with the same ID in the Plasma Store, so
     // ignore this requst.
     return PlasmaError::ObjectExists;
   }
+  auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+  entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
+  entry->data_size = data_size;
+  entry->metadata_size = metadata_size;
 
   int fd = -1;
   int64_t map_size = 0;
   ptrdiff_t offset = 0;
-  uint8_t* pointer =
-      AllocateMemory(device_num, data_size + metadata_size, &fd, &map_size, &offset);
-  if (!pointer) {
+  uint8_t* pointer = nullptr;
+  auto total_size = data_size + metadata_size;
+
+  if (device_num != 0) {
+#ifdef PLASMA_CUDA
+    auto st = AllocateCudaMemory(device_num, total_size, &pointer, &entry->ipc_handle);
+    if (!st.ok()) {
+      ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
+      return PlasmaError::OutOfMemory;
+    }
+    result->ipc_handle = entry->ipc_handle;
+#else
+    ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
     return PlasmaError::OutOfMemory;
+#endif
+  } else {
+    pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
+    if (!pointer) {
+      return PlasmaError::OutOfMemory;
+    }
   }
-  if (!entry) {
-    auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
-    entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
-    entry->data_size = data_size;
-    entry->metadata_size = metadata_size;
-  }
+
   entry->pointer = pointer;
   // TODO(pcm): Set the other fields.
   entry->fd = fd;
@@ -231,12 +245,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
   entry->device_num = device_num;
   entry->create_time = std::time(nullptr);
   entry->construct_duration = -1;
-#ifdef PLASMA_CUDA
-  if (device_num != 0) {
-    DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle));
-    result->ipc_handle = entry->ipc_handle;
-  }
-#endif
 
   result->store_fd = fd;
   result->data_offset = offset;
@@ -419,8 +427,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
       // Make sure the object pointer is not already allocated
       ARROW_CHECK(!entry->pointer);
 
-      entry->pointer = AllocateMemory(0, /* Only support device_num = 0 */
-                                      entry->data_size + entry->metadata_size, &entry->fd,
+      entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd,
                                       &entry->map_size, &entry->offset);
       if (entry->pointer) {
         entry->state = ObjectState::PLASMA_CREATED;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 7105c51..16ae675 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -212,8 +212,11 @@ class PlasmaStore {
   int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
                                 Client* client);
 
-  uint8_t* AllocateMemory(int device_num, size_t size, int* fd, int64_t* map_size,
-                          ptrdiff_t* offset);
+  uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset);
+#ifdef PLASMA_CUDA
+  Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
+                            std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);
+#endif
 
   /// Event loop of the plasma store.
   EventLoop* loop_;