You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/02/11 22:49:23 UTC
[arrow] branch master updated: ARROW-4498: [Plasma] Fix building
Plasma with CUDA enabled
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 18f9e69 ARROW-4498: [Plasma] Fix building Plasma with CUDA enabled
18f9e69 is described below
commit 18f9e692f3bf588e4b062702e79884ca26882dbf
Author: Antoine Pitrou <an...@python.org>
AuthorDate: Mon Feb 11 16:49:05 2019 -0600
ARROW-4498: [Plasma] Fix building Plasma with CUDA enabled
This fixes build and tests for me.
Author: Antoine Pitrou <an...@python.org>
Closes #3608 from pitrou/ARROW-4498-plasma-cuda-fix and squashes the following commits:
9090449a <Antoine Pitrou> ARROW-4498: Fix building Plasma with CUDA enabled
---
cpp/src/arrow/CMakeLists.txt | 1 +
cpp/src/arrow/util/{memory.h => memory.cc} | 11 +--
cpp/src/arrow/util/memory.h | 50 +-------------
cpp/src/plasma/store.cc | 105 +++++++++++++++--------------
cpp/src/plasma/store.h | 7 +-
5 files changed, 66 insertions(+), 108 deletions(-)
diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt
index 0815a89..aa09968 100644
--- a/cpp/src/arrow/CMakeLists.txt
+++ b/cpp/src/arrow/CMakeLists.txt
@@ -119,6 +119,7 @@ set(ARROW_SRCS
util/io-util.cc
util/logging.cc
util/key_value_metadata.cc
+ util/memory.cc
util/task-group.cc
util/thread-pool.cc
util/trie.cc
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.cc
similarity index 89%
copy from cpp/src/arrow/util/memory.h
copy to cpp/src/arrow/util/memory.cc
index bc1a526..82ce787 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.cc
@@ -15,18 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-#ifndef ARROW_UTIL_MEMORY_H
-#define ARROW_UTIL_MEMORY_H
-
-#include <thread>
#include <vector>
+#include "arrow/util/memory.h"
#include "arrow/util/thread-pool.h"
namespace arrow {
namespace internal {
-uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
+inline uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
uintptr_t value = reinterpret_cast<uintptr_t>(address);
return reinterpret_cast<uint8_t*>(value & bits);
}
@@ -35,8 +32,6 @@ uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
// See also: https://sourceforge.net/p/mingw-w64/bugs/767/
void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
-// A helper function for doing memcpy with multiple threads. This is required
-// to saturate the memory bandwidth of modern cpus.
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
uintptr_t block_size, int num_threads) {
// XXX This function is really using `num_threads + 1` threads.
@@ -76,5 +71,3 @@ void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
} // namespace internal
} // namespace arrow
-
-#endif // ARROW_UTIL_MEMORY_H
diff --git a/cpp/src/arrow/util/memory.h b/cpp/src/arrow/util/memory.h
index bc1a526..62641e3 100644
--- a/cpp/src/arrow/util/memory.h
+++ b/cpp/src/arrow/util/memory.h
@@ -18,61 +18,15 @@
#ifndef ARROW_UTIL_MEMORY_H
#define ARROW_UTIL_MEMORY_H
-#include <thread>
-#include <vector>
-
-#include "arrow/util/thread-pool.h"
+#include <cstdint>
namespace arrow {
namespace internal {
-uint8_t* pointer_logical_and(const uint8_t* address, uintptr_t bits) {
- uintptr_t value = reinterpret_cast<uintptr_t>(address);
- return reinterpret_cast<uint8_t*>(value & bits);
-}
-
-// This function is just for avoiding MinGW-w64 32bit crash.
-// See also: https://sourceforge.net/p/mingw-w64/bugs/767/
-void* wrap_memcpy(void* dst, const void* src, size_t n) { return memcpy(dst, src, n); }
-
// A helper function for doing memcpy with multiple threads. This is required
// to saturate the memory bandwidth of modern cpus.
void parallel_memcopy(uint8_t* dst, const uint8_t* src, int64_t nbytes,
- uintptr_t block_size, int num_threads) {
- // XXX This function is really using `num_threads + 1` threads.
- auto pool = GetCpuThreadPool();
-
- uint8_t* left = pointer_logical_and(src + block_size - 1, ~(block_size - 1));
- uint8_t* right = pointer_logical_and(src + nbytes, ~(block_size - 1));
- int64_t num_blocks = (right - left) / block_size;
-
- // Update right address
- right = right - (num_blocks % num_threads) * block_size;
-
- // Now we divide these blocks between available threads. The remainder is
- // handled separately.
- size_t chunk_size = (right - left) / num_threads;
- int64_t prefix = left - src;
- int64_t suffix = src + nbytes - right;
- // Now the data layout is | prefix | k * num_threads * block_size | suffix |.
- // We have chunk_size = k * block_size, therefore the data layout is
- // | prefix | num_threads * chunk_size | suffix |.
- // Each thread gets a "chunk" of k blocks.
-
- // Start all parallel memcpy tasks and handle leftovers while threads run.
- std::vector<std::future<void*>> futures;
-
- for (int i = 0; i < num_threads; i++) {
- futures.emplace_back(pool->Submit(wrap_memcpy, dst + prefix + i * chunk_size,
- left + i * chunk_size, chunk_size));
- }
- memcpy(dst, src, prefix);
- memcpy(dst + prefix + num_threads * chunk_size, right, suffix);
-
- for (auto& fut : futures) {
- fut.get();
- }
-}
+ uintptr_t block_size, int num_threads);
} // namespace internal
} // namespace arrow
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 05495b7..82f648c 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -148,17 +148,10 @@ void PlasmaStore::AddToClientObjectIds(const ObjectID& object_id, ObjectTableEnt
}
// Allocate memory
-uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd,
- int64_t* map_size, ptrdiff_t* offset) {
+uint8_t* PlasmaStore::AllocateMemory(size_t size, int* fd, int64_t* map_size,
+ ptrdiff_t* offset) {
// Try to evict objects until there is enough space.
uint8_t* pointer = nullptr;
-#ifdef PLASMA_CUDA
- std::shared_ptr<CudaBuffer> gpu_handle;
- std::shared_ptr<CudaContext> context_;
- if (device_num != 0) {
- DCHECK_OK(manager_->GetContext(device_num - 1, &context_));
- }
-#endif
while (true) {
// Allocate space for the new object. We use memalign instead of malloc
// in order to align the allocated region to a 64-byte boundary. This is not
@@ -167,61 +160,82 @@ uint8_t* PlasmaStore::AllocateMemory(int device_num, size_t size, int* fd,
// plasma_client.cc). Note that even though this pointer is 64-byte aligned,
// it is not guaranteed that the corresponding pointer in the client will be
// 64-byte aligned, but in practice it often will be.
- if (device_num == 0) {
- pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
- if (!pointer) {
- // Tell the eviction policy how much space we need to create this object.
- std::vector<ObjectID> objects_to_evict;
- bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
- EvictObjects(objects_to_evict);
- // Return an error to the client if not enough space could be freed to
- // create the object.
- if (!success) {
- return nullptr;
- }
- } else {
- break;
- }
- } else {
-#ifdef PLASMA_CUDA
- DCHECK_OK(context_->Allocate(data_size + metadata_size, &gpu_handle));
+ pointer = reinterpret_cast<uint8_t*>(PlasmaAllocator::Memalign(kBlockSize, size));
+ if (pointer) {
break;
-#endif
+ }
+ // Tell the eviction policy how much space we need to create this object.
+ std::vector<ObjectID> objects_to_evict;
+ bool success = eviction_policy_.RequireSpace(size, &objects_to_evict);
+ EvictObjects(objects_to_evict);
+ // Return an error to the client if not enough space could be freed to
+ // create the object.
+ if (!success) {
+ return nullptr;
}
}
- if (device_num == 0) {
- GetMallocMapinfo(pointer, fd, map_size, offset);
- ARROW_CHECK(*fd != -1);
- }
+ GetMallocMapinfo(pointer, fd, map_size, offset);
+ ARROW_CHECK(*fd != -1);
return pointer;
}
+#ifdef PLASMA_CUDA
+Status PlasmaStore::AllocateCudaMemory(
+ int device_num, int64_t size, uint8_t** out_pointer,
+ std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle) {
+ std::shared_ptr<CudaBuffer> cuda_buffer;
+ std::shared_ptr<CudaContext> context_;
+ DCHECK_NE(device_num, 0);
+ RETURN_NOT_OK(manager_->GetContext(device_num - 1, &context_));
+ RETURN_NOT_OK(context_->Allocate(static_cast<int64_t>(size), &cuda_buffer));
+ *out_pointer = cuda_buffer->mutable_data();
+ // The IPC handle will keep the buffer memory alive
+ return cuda_buffer->ExportForIpc(out_ipc_handle);
+}
+#endif
+
// Create a new object buffer in the hash table.
PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size,
int64_t metadata_size, int device_num,
Client* client, PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+
auto entry = GetObjectTableEntry(&store_info_, object_id);
if (entry != nullptr) {
// There is already an object with the same ID in the Plasma Store, so
// ignore this requst.
return PlasmaError::ObjectExists;
}
+ auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+ entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
+ entry->data_size = data_size;
+ entry->metadata_size = metadata_size;
int fd = -1;
int64_t map_size = 0;
ptrdiff_t offset = 0;
- uint8_t* pointer =
- AllocateMemory(device_num, data_size + metadata_size, &fd, &map_size, &offset);
- if (!pointer) {
+ uint8_t* pointer = nullptr;
+ auto total_size = data_size + metadata_size;
+
+ if (device_num != 0) {
+#ifdef PLASMA_CUDA
+ auto st = AllocateCudaMemory(device_num, total_size, &pointer, &entry->ipc_handle);
+ if (!st.ok()) {
+ ARROW_LOG(ERROR) << "Failed to allocate CUDA memory: " << st.ToString();
+ return PlasmaError::OutOfMemory;
+ }
+ result->ipc_handle = entry->ipc_handle;
+#else
+ ARROW_LOG(ERROR) << "device_num != 0 but CUDA not enabled";
return PlasmaError::OutOfMemory;
+#endif
+ } else {
+ pointer = AllocateMemory(total_size, &fd, &map_size, &offset);
+ if (!pointer) {
+ return PlasmaError::OutOfMemory;
+ }
}
- if (!entry) {
- auto ptr = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
- entry = store_info_.objects.emplace(object_id, std::move(ptr)).first->second.get();
- entry->data_size = data_size;
- entry->metadata_size = metadata_size;
- }
+
entry->pointer = pointer;
// TODO(pcm): Set the other fields.
entry->fd = fd;
@@ -231,12 +245,6 @@ PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_si
entry->device_num = device_num;
entry->create_time = std::time(nullptr);
entry->construct_duration = -1;
-#ifdef PLASMA_CUDA
- if (device_num != 0) {
- DCHECK_OK(gpu_handle->ExportForIpc(&entry->ipc_handle));
- result->ipc_handle = entry->ipc_handle;
- }
-#endif
result->store_fd = fd;
result->data_offset = offset;
@@ -419,8 +427,7 @@ void PlasmaStore::ProcessGetRequest(Client* client,
// Make sure the object pointer is not already allocated
ARROW_CHECK(!entry->pointer);
- entry->pointer = AllocateMemory(0, /* Only support device_num = 0 */
- entry->data_size + entry->metadata_size, &entry->fd,
+ entry->pointer = AllocateMemory(entry->data_size + entry->metadata_size, &entry->fd,
&entry->map_size, &entry->offset);
if (entry->pointer) {
entry->state = ObjectState::PLASMA_CREATED;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 7105c51..16ae675 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -212,8 +212,11 @@ class PlasmaStore {
int RemoveFromClientObjectIds(const ObjectID& object_id, ObjectTableEntry* entry,
Client* client);
- uint8_t* AllocateMemory(int device_num, size_t size, int* fd, int64_t* map_size,
- ptrdiff_t* offset);
+ uint8_t* AllocateMemory(size_t size, int* fd, int64_t* map_size, ptrdiff_t* offset);
+#ifdef PLASMA_CUDA
+ Status AllocateCudaMemory(int device_num, int64_t size, uint8_t** out_pointer,
+ std::shared_ptr<CudaIpcMemHandle>* out_ipc_handle);
+#endif
/// Event loop of the plasma store.
EventLoop* loop_;