You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/23 18:39:21 UTC
[14/14] arrow git commit: [C++] Restore Plasma source tree after
0.5.0 release
[C++] Restore Plasma source tree after 0.5.0 release
This reverts commit 62ef2cd8a39fc93e7fa4bb790d7cd92adb77571f.
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2c810151
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2c810151
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2c810151
Branch: refs/heads/master
Commit: 2c8101515e2a0ab515c03f82dc84b02ca6c466da
Parents: 9b26ed8
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun Jul 23 14:37:54 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jul 23 14:37:54 2017 -0400
----------------------------------------------------------------------
cpp/src/plasma/CMakeLists.txt | 113 +
cpp/src/plasma/client.cc | 557 ++
cpp/src/plasma/client.h | 343 ++
cpp/src/plasma/common.cc | 83 +
cpp/src/plasma/common.h | 63 +
cpp/src/plasma/events.cc | 81 +
cpp/src/plasma/events.h | 99 +
cpp/src/plasma/eviction_policy.cc | 107 +
cpp/src/plasma/eviction_policy.h | 134 +
cpp/src/plasma/extension.cc | 456 ++
cpp/src/plasma/extension.h | 50 +
cpp/src/plasma/fling.cc | 90 +
cpp/src/plasma/fling.h | 52 +
cpp/src/plasma/format/.gitignore | 1 +
cpp/src/plasma/format/common.fbs | 34 +
cpp/src/plasma/format/plasma.fbs | 291 ++
cpp/src/plasma/io.cc | 212 +
cpp/src/plasma/io.h | 55 +
cpp/src/plasma/malloc.cc | 178 +
cpp/src/plasma/malloc.h | 26 +
cpp/src/plasma/plasma.cc | 64 +
cpp/src/plasma/plasma.h | 191 +
cpp/src/plasma/protocol.cc | 502 ++
cpp/src/plasma/protocol.h | 170 +
cpp/src/plasma/store.cc | 683 +++
cpp/src/plasma/store.h | 169 +
cpp/src/plasma/test/client_tests.cc | 132 +
cpp/src/plasma/test/run_tests.sh | 61 +
cpp/src/plasma/test/run_valgrind.sh | 27 +
cpp/src/plasma/test/serialization_tests.cc | 388 ++
cpp/src/plasma/thirdparty/ae/ae.c | 465 ++
cpp/src/plasma/thirdparty/ae/ae.h | 123 +
cpp/src/plasma/thirdparty/ae/ae_epoll.c | 135 +
cpp/src/plasma/thirdparty/ae/ae_evport.c | 320 ++
cpp/src/plasma/thirdparty/ae/ae_kqueue.c | 138 +
cpp/src/plasma/thirdparty/ae/ae_select.c | 106 +
cpp/src/plasma/thirdparty/ae/config.h | 54 +
cpp/src/plasma/thirdparty/ae/zmalloc.h | 45 +
cpp/src/plasma/thirdparty/dlmalloc.c | 6281 +++++++++++++++++++++++
cpp/src/plasma/thirdparty/xxhash.cc | 889 ++++
cpp/src/plasma/thirdparty/xxhash.h | 293 ++
41 files changed, 14261 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
new file mode 100644
index 0000000..4ff3beb
--- /dev/null
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8)
+
+project(plasma)
+
+find_package(PythonLibsNew REQUIRED)
+find_package(Threads)
+
+option(PLASMA_PYTHON
+ "Build the Plasma Python extensions"
+ OFF)
+
+if(APPLE)
+ SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
+endif(APPLE)
+
+include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
+include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
+
+# Compile flatbuffers
+
+set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
+set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
+
+set(PLASMA_FBS_OUTPUT_FILES
+ "${OUTPUT_DIR}/common_generated.h"
+ "${OUTPUT_DIR}/plasma_generated.h")
+
+add_custom_target(gen_plasma_fbs DEPENDS ${PLASMA_FBS_OUTPUT_FILES})
+
+if(FLATBUFFERS_VENDORED)
+ add_dependencies(gen_plasma_fbs flatbuffers_ep)
+endif()
+
+add_custom_command(
+ OUTPUT ${PLASMA_FBS_OUTPUT_FILES}
+ # The --gen-object-api flag generates a C++ class MessageT for each
+ # flatbuffers message Message, which can be used to store deserialized
+ # messages in data structures. This is currently used for ObjectInfo for
+ # example.
+ COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} --gen-object-api
+ DEPENDS ${PLASMA_FBS_SRC}
+ COMMENT "Running flatc compiler on ${PLASMA_FBS_SRC}"
+ VERBATIM)
+
+if(UNIX AND NOT APPLE)
+ link_libraries(rt)
+endif()
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
+
+set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)
+
+set(PLASMA_SRCS
+ client.cc
+ common.cc
+ eviction_policy.cc
+ events.cc
+ fling.cc
+ io.cc
+ malloc.cc
+ plasma.cc
+ protocol.cc
+ thirdparty/ae/ae.c
+ thirdparty/xxhash.cc)
+
+ADD_ARROW_LIB(plasma
+ SOURCES ${PLASMA_SRCS}
+ DEPENDENCIES gen_plasma_fbs
+ SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static
+ STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static)
+
+# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
+# malloc.cc; we set it here regardless of whether we do a debug or release build.
+set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
+
+add_executable(plasma_store store.cc)
+target_link_libraries(plasma_store plasma_static)
+
+ADD_ARROW_TEST(test/serialization_tests)
+ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
+ADD_ARROW_TEST(test/client_tests)
+ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
+
+if(PLASMA_PYTHON)
+ add_library(plasma_extension SHARED extension.cc)
+
+ if(APPLE)
+ target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
+ else(APPLE)
+ target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
+ endif(APPLE)
+endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
new file mode 100644
index 0000000..dcb78e7
--- /dev/null
+++ b/cpp/src/plasma/client.cc
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA CLIENT: Client library for using the plasma store and manager
+
+#include "plasma/client.h"
+
+#ifdef _WIN32
+#include <Win32_Interop/win32_types.h>
+#endif
+
+#include <assert.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <thread>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+#define XXH_STATIC_LINKING_ONLY
+#include "thirdparty/xxhash.h"
+
+#define XXH64_DEFAULT_SEED 0
+
+// Number of threads used for memcopy and hash computations.
+constexpr int64_t kThreadPoolSize = 8;
+constexpr int64_t kBytesInMB = 1 << 20;
+static std::vector<std::thread> threadpool_(kThreadPoolSize);
+
+// If the file descriptor fd has been mmapped in this client process before,
+// return the pointer that was returned by mmap, otherwise mmap it and store the
+// pointer in a hash table.
+uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) {
+ auto entry = mmap_table_.find(store_fd_val);
+ if (entry != mmap_table_.end()) {
+ close(fd);
+ return entry->second.pointer;
+ } else {
+ uint8_t* result = reinterpret_cast<uint8_t*>(
+ mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
+ // TODO(pcm): Don't fail here, instead return a Status.
+ if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; }
+ close(fd);
+ ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
+ entry.pointer = result;
+ entry.length = map_size;
+ entry.count = 0;
+ return result;
+ }
+}
+
+// Get a pointer to a file that we know has been memory mapped in this client
+// process before.
+uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
+ auto entry = mmap_table_.find(store_fd_val);
+ ARROW_CHECK(entry != mmap_table_.end());
+ return entry->second.pointer;
+}
+
+void PlasmaClient::increment_object_count(
+ const ObjectID& object_id, PlasmaObject* object, bool is_sealed) {
+ // Increment the count of the object to track the fact that it is being used.
+ // The corresponding decrement should happen in PlasmaClient::Release.
+ auto elem = objects_in_use_.find(object_id);
+ ObjectInUseEntry* object_entry;
+ if (elem == objects_in_use_.end()) {
+ // Add this object ID to the hash table of object IDs in use. The
+ // corresponding call to free happens in PlasmaClient::Release.
+ objects_in_use_[object_id] =
+ std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
+ objects_in_use_[object_id]->object = *object;
+ objects_in_use_[object_id]->count = 0;
+ objects_in_use_[object_id]->is_sealed = is_sealed;
+ object_entry = objects_in_use_[object_id].get();
+ // Increment the count of the number of objects in the memory-mapped file
+ // that are being used. The corresponding decrement should happen in
+ // PlasmaClient::Release.
+ auto entry = mmap_table_.find(object->handle.store_fd);
+ ARROW_CHECK(entry != mmap_table_.end());
+ ARROW_CHECK(entry->second.count >= 0);
+ // Update the in_use_object_bytes_.
+ in_use_object_bytes_ +=
+ (object_entry->object.data_size + object_entry->object.metadata_size);
+ entry->second.count += 1;
+ } else {
+ object_entry = elem->second.get();
+ ARROW_CHECK(object_entry->count > 0);
+ }
+ // Increment the count of the number of instances of this object that are
+ // being used by this client. The corresponding decrement should happen in
+ // PlasmaClient::Release.
+ object_entry->count += 1;
+}
+
+Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
+ uint8_t* metadata, int64_t metadata_size, uint8_t** data) {
+ ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
+ << data_size << " and metadata size " << metadata_size;
+ RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
+ ObjectID id;
+ PlasmaObject object;
+ RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
+ // If the CreateReply included an error, then the store will not send a file
+ // descriptor.
+ int fd = recv_fd(store_conn_);
+ ARROW_CHECK(fd >= 0) << "recv not successful";
+ ARROW_CHECK(object.data_size == data_size);
+ ARROW_CHECK(object.metadata_size == metadata_size);
+ // The metadata should come right after the data.
+ ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
+ *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+ object.data_offset;
+ // If plasma_create is being called from a transfer, then we will not copy the
+ // metadata here. The metadata will be written along with the data streamed
+ // from the transfer.
+ if (metadata != NULL) {
+ // Copy the metadata to the buffer.
+ memcpy(*data + object.data_size, metadata, metadata_size);
+ }
+ // Increment the count of the number of instances of this object that this
+ // client is using. A call to PlasmaClient::Release is required to decrement
+ // this
+ // count. Cache the reference to the object.
+ increment_object_count(object_id, &object, false);
+ // We increment the count a second time (and the corresponding decrement will
+ // happen in a PlasmaClient::Release call in plasma_seal) so even if the
+ // buffer
+ // returned by PlasmaClient::Dreate goes out of scope, the object does not get
+ // released before the call to PlasmaClient::Seal happens.
+ increment_object_count(object_id, &object, false);
+ return Status::OK();
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+ int64_t timeout_ms, ObjectBuffer* object_buffers) {
+ // Fill out the info for the objects that are already in use locally.
+ bool all_present = true;
+ for (int i = 0; i < num_objects; ++i) {
+ auto object_entry = objects_in_use_.find(object_ids[i]);
+ if (object_entry == objects_in_use_.end()) {
+ // This object is not currently in use by this client, so we need to send
+ // a request to the store.
+ all_present = false;
+ // Make a note to ourselves that the object is not present.
+ object_buffers[i].data_size = -1;
+ } else {
+ // NOTE: If the object is still unsealed, we will deadlock, since we must
+ // have been the one who created it.
+ ARROW_CHECK(object_entry->second->is_sealed)
+ << "Plasma client called get on an unsealed object that it created";
+ PlasmaObject* object = &object_entry->second->object;
+ object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
+ object_buffers[i].data = object_buffers[i].data + object->data_offset;
+ object_buffers[i].data_size = object->data_size;
+ object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+ object_buffers[i].metadata_size = object->metadata_size;
+ // Increment the count of the number of instances of this object that this
+ // client is using. A call to PlasmaClient::Release is required to
+ // decrement this
+ // count. Cache the reference to the object.
+ increment_object_count(object_ids[i], object, true);
+ }
+ }
+
+ if (all_present) { return Status::OK(); }
+
+ // If we get here, then the objects aren't all currently in use by this
+ // client, so we need to send a request to the plasma store.
+ RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, &buffer));
+ std::vector<ObjectID> received_object_ids(num_objects);
+ std::vector<PlasmaObject> object_data(num_objects);
+ PlasmaObject* object;
+ RETURN_NOT_OK(ReadGetReply(
+ buffer.data(), received_object_ids.data(), object_data.data(), num_objects));
+
+ for (int i = 0; i < num_objects; ++i) {
+ DCHECK(received_object_ids[i] == object_ids[i]);
+ object = &object_data[i];
+ if (object_buffers[i].data_size != -1) {
+ // If the object was already in use by the client, then the store should
+ // have returned it.
+ DCHECK_NE(object->data_size, -1);
+ // We won't use this file descriptor, but the store sent us one, so we
+ // need to receive it and then close it right away so we don't leak file
+ // descriptors.
+ int fd = recv_fd(store_conn_);
+ close(fd);
+ ARROW_CHECK(fd >= 0);
+ // We've already filled out the information for this object, so we can
+ // just continue.
+ continue;
+ }
+ // If we are here, the object was not currently in use, so we need to
+ // process the reply from the object store.
+ if (object->data_size != -1) {
+ // The object was retrieved. The user will be responsible for releasing
+ // this object.
+ int fd = recv_fd(store_conn_);
+ ARROW_CHECK(fd >= 0);
+ object_buffers[i].data =
+ lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size);
+ // Finish filling out the return values.
+ object_buffers[i].data = object_buffers[i].data + object->data_offset;
+ object_buffers[i].data_size = object->data_size;
+ object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+ object_buffers[i].metadata_size = object->metadata_size;
+ // Increment the count of the number of instances of this object that this
+ // client is using. A call to PlasmaClient::Release is required to
+ // decrement this
+ // count. Cache the reference to the object.
+ increment_object_count(received_object_ids[i], object, true);
+ } else {
+ // The object was not retrieved. Make sure we already put a -1 here to
+ // indicate that the object was not retrieved. The caller is not
+ // responsible for releasing this object.
+ DCHECK_EQ(object_buffers[i].data_size, -1);
+ object_buffers[i].data_size = -1;
+ }
+ }
+ return Status::OK();
+}
+
+/// This is a helper method for implementing plasma_release. We maintain a
+/// buffer
+/// of release calls and only perform them once the buffer becomes full (as
+/// judged by the aggregate sizes of the objects). There may be multiple release
+/// calls for the same object ID in the buffer. In this case, the first release
+/// calls will not do anything. The client will only send a message to the store
+/// releasing the object when the client is truly done with the object.
+///
+/// @param conn The plasma connection.
+/// @param object_id The object ID to attempt to release.
+Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
+ // Decrement the count of the number of instances of this object that are
+ // being used by this client. The corresponding increment should have happened
+ // in PlasmaClient::Get.
+ auto object_entry = objects_in_use_.find(object_id);
+ ARROW_CHECK(object_entry != objects_in_use_.end());
+ object_entry->second->count -= 1;
+ ARROW_CHECK(object_entry->second->count >= 0);
+ // Check if the client is no longer using this object.
+ if (object_entry->second->count == 0) {
+ // Decrement the count of the number of objects in this memory-mapped file
+ // that the client is using. The corresponding increment should have
+ // happened in plasma_get.
+ int fd = object_entry->second->object.handle.store_fd;
+ auto entry = mmap_table_.find(fd);
+ ARROW_CHECK(entry != mmap_table_.end());
+ entry->second.count -= 1;
+ ARROW_CHECK(entry->second.count >= 0);
+ // If none are being used then unmap the file.
+ if (entry->second.count == 0) {
+ munmap(entry->second.pointer, entry->second.length);
+ // Remove the corresponding entry from the hash table.
+ mmap_table_.erase(fd);
+ }
+ // Tell the store that the client no longer needs the object.
+ RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
+ // Update the in_use_object_bytes_.
+ in_use_object_bytes_ -= (object_entry->second->object.data_size +
+ object_entry->second->object.metadata_size);
+ DCHECK_GE(in_use_object_bytes_, 0);
+ // Remove the entry from the hash table of objects currently in use.
+ objects_in_use_.erase(object_id);
+ }
+ return Status::OK();
+}
+
+Status PlasmaClient::Release(const ObjectID& object_id) {
+ // Add the new object to the release history.
+ release_history_.push_front(object_id);
+ // If there are too many bytes in use by the client or if there are too many
+ // pending release calls, and there are at least some pending release calls in
+ // the release_history list, then release some objects.
+ while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
+ release_history_.size() > config_.release_delay) &&
+ release_history_.size() > 0) {
+ // Perform a release for the object ID for the first pending release.
+ RETURN_NOT_OK(PerformRelease(release_history_.back()));
+ // Remove the last entry from the release history.
+ release_history_.pop_back();
+ }
+ return Status::OK();
+}
+
+// This method is used to query whether the plasma store contains an object.
+Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
+ // Check if we already have a reference to the object.
+ if (objects_in_use_.count(object_id) > 0) {
+ *has_object = 1;
+ } else {
+ // If we don't already have a reference to the object, check with the store
+ // to see if we have the object.
+ RETURN_NOT_OK(SendContainsRequest(store_conn_, object_id));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
+ ObjectID object_id2;
+ RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
+ }
+ return Status::OK();
+}
+
+static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
+ XXH64_state_t hash_state;
+ XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+ XXH64_update(&hash_state, data, nbytes);
+ *hash = XXH64_digest(&hash_state);
+}
+
+static inline bool compute_object_hash_parallel(
+ XXH64_state_t* hash_state, const unsigned char* data, int64_t nbytes) {
+ // Note that this function will likely be faster if the address of data is
+ // aligned on a 64-byte boundary.
+ const int num_threads = kThreadPoolSize;
+ uint64_t threadhash[num_threads + 1];
+ const uint64_t data_address = reinterpret_cast<uint64_t>(data);
+ const uint64_t num_blocks = nbytes / BLOCK_SIZE;
+ const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
+ const uint64_t right_address = data_address + chunk_size * num_threads;
+ const uint64_t suffix = (data_address + nbytes) - right_address;
+ // Now the data layout is | k * num_threads * block_size | suffix | ==
+ // | num_threads * chunk_size | suffix |, where chunk_size = k * block_size.
+ // Each thread gets a "chunk" of k blocks, except the suffix thread.
+
+ for (int i = 0; i < num_threads; i++) {
+ threadpool_[i] = std::thread(ComputeBlockHash,
+ reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, chunk_size,
+ &threadhash[i]);
+ }
+ ComputeBlockHash(
+ reinterpret_cast<uint8_t*>(right_address), suffix, &threadhash[num_threads]);
+
+ // Join the threads.
+ for (auto& t : threadpool_) {
+ if (t.joinable()) { t.join(); }
+ }
+
+ XXH64_update(hash_state, (unsigned char*)threadhash, sizeof(threadhash));
+ return true;
+}
+
+static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+ XXH64_state_t hash_state;
+ XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+ if (obj_buffer.data_size >= kBytesInMB) {
+ compute_object_hash_parallel(
+ &hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
+ } else {
+ XXH64_update(&hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
+ }
+ XXH64_update(
+ &hash_state, (unsigned char*)obj_buffer.metadata, obj_buffer.metadata_size);
+ return XXH64_digest(&hash_state);
+}
+
+bool plasma_compute_object_hash(
+ PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
+ // Get the plasma object data. We pass in a timeout of 0 to indicate that
+ // the operation should timeout immediately.
+ ObjectBuffer object_buffer;
+ ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
+ // If the object was not retrieved, return false.
+ if (object_buffer.data_size == -1) { return false; }
+ // Compute the hash.
+ uint64_t hash = compute_object_hash(object_buffer);
+ memcpy(digest, &hash, sizeof(hash));
+ // Release the plasma object.
+ ARROW_CHECK_OK(conn->Release(object_id));
+ return true;
+}
+
+Status PlasmaClient::Seal(const ObjectID& object_id) {
+ // Make sure this client has a reference to the object before sending the
+ // request to Plasma.
+ auto object_entry = objects_in_use_.find(object_id);
+ ARROW_CHECK(object_entry != objects_in_use_.end())
+ << "Plasma client called seal an object without a reference to it";
+ ARROW_CHECK(!object_entry->second->is_sealed)
+ << "Plasma client called seal an already sealed object";
+ object_entry->second->is_sealed = true;
+ /// Send the seal request to Plasma.
+ static unsigned char digest[kDigestSize];
+ ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
+ RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
+ // We call PlasmaClient::Release to decrement the number of instances of this
+ // object
+ // that are currently being used by this client. The corresponding increment
+ // happened in plasma_create and was used to ensure that the object was not
+ // released before the call to PlasmaClient::Seal.
+ return Release(object_id);
+}
+
+Status PlasmaClient::Delete(const ObjectID& object_id) {
+ // TODO(rkn): In the future, we can use this method to give hints to the
+ // eviction policy about when an object will no longer be needed.
+ return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+}
+
+Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
+ // Send a request to the store to evict objects.
+ RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
+ // Wait for a response with the number of bytes actually evicted.
+ std::vector<uint8_t> buffer;
+ int64_t type;
+ RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+ return ReadEvictReply(buffer.data(), num_bytes_evicted);
+}
+
+Status PlasmaClient::Subscribe(int* fd) {
+ int sock[2];
+ // Create a non-blocking socket pair. This will only be used to send
+ // notifications from the Plasma store to the client.
+ socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
+ // Make the socket non-blocking.
+ int flags = fcntl(sock[1], F_GETFL, 0);
+ ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
+ // Tell the Plasma store about the subscription.
+ RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
+ // Send the file descriptor that the Plasma store should use to push
+ // notifications about sealed objects to this client.
+ ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0);
+ close(sock[1]);
+ // Return the file descriptor that the client should use to read notifications
+ // about sealed objects.
+ *fd = sock[0];
+ return Status::OK();
+}
+
+Status PlasmaClient::Connect(const std::string& store_socket_name,
+ const std::string& manager_socket_name, int release_delay) {
+ store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
+ if (manager_socket_name != "") {
+ manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
+ } else {
+ manager_conn_ = -1;
+ }
+ config_.release_delay = release_delay;
+ in_use_object_bytes_ = 0;
+ // Send a ConnectRequest to the store to get its memory capacity.
+ RETURN_NOT_OK(SendConnectRequest(store_conn_));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
+ RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
+ return Status::OK();
+}
+
+Status PlasmaClient::Disconnect() {
+ // NOTE: We purposefully do not finish sending release calls for objects in
+ // use, so that we don't duplicate PlasmaClient::Release calls (when handling
+ // a SIGTERM, for example).
+
+ // Close the connections to Plasma. The Plasma store will release the objects
+ // that were in use by us when handling the SIGPIPE.
+ close(store_conn_);
+ if (manager_conn_ >= 0) { close(manager_conn_); }
+ return Status::OK();
+}
+
+#define h_addr h_addr_list[0]
+
+Status PlasmaClient::Transfer(const char* address, int port, const ObjectID& object_id) {
+ return SendDataRequest(manager_conn_, object_id, address, port);
+}
+
+Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
+ ARROW_CHECK(manager_conn_ >= 0);
+ return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
+}
+
+int PlasmaClient::get_manager_fd() {
+ return manager_conn_;
+}
+
+Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
+ ARROW_CHECK(manager_conn_ >= 0);
+
+ RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
+ ObjectID id;
+ RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
+ ARROW_CHECK(object_id == id);
+ return Status::OK();
+}
+
+Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests,
+ int num_ready_objects, int64_t timeout_ms, int* num_objects_ready) {
+ ARROW_CHECK(manager_conn_ >= 0);
+ ARROW_CHECK(num_object_requests > 0);
+ ARROW_CHECK(num_ready_objects > 0);
+ ARROW_CHECK(num_ready_objects <= num_object_requests);
+
+ for (int i = 0; i < num_object_requests; ++i) {
+ ARROW_CHECK(object_requests[i].type == PLASMA_QUERY_LOCAL ||
+ object_requests[i].type == PLASMA_QUERY_ANYWHERE);
+ }
+
+ RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests,
+ num_ready_objects, timeout_ms));
+ std::vector<uint8_t> buffer;
+ RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
+ RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
+
+ *num_objects_ready = 0;
+ for (int i = 0; i < num_object_requests; ++i) {
+ int type = object_requests[i].type;
+ int status = object_requests[i].status;
+ switch (type) {
+ case PLASMA_QUERY_LOCAL:
+ if (status == ObjectStatus_Local) { *num_objects_ready += 1; }
+ break;
+ case PLASMA_QUERY_ANYWHERE:
+ if (status == ObjectStatus_Local || status == ObjectStatus_Remote) {
+ *num_objects_ready += 1;
+ } else {
+ ARROW_CHECK(status == ObjectStatus_Nonexistent);
+ }
+ break;
+ default:
+ ARROW_LOG(FATAL) << "This code should be unreachable.";
+ }
+ }
+ return Status::OK();
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
new file mode 100644
index 0000000..fb3a161
--- /dev/null
+++ b/cpp/src/plasma/client.h
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_CLIENT_H
+#define PLASMA_CLIENT_H
+
+#include <stdbool.h>
+#include <time.h>
+
+#include <deque>
+#include <string>
+
+#include "plasma/plasma.h"
+
+using arrow::Status;
+
+#define PLASMA_DEFAULT_RELEASE_DELAY 64
+
+// Use 100MB as an overestimate of the L3 cache size.
+constexpr int64_t kL3CacheSizeBytes = 100000000;
+
+/// Object buffer data structure.
+struct ObjectBuffer {
+ /// The size in bytes of the data object.
+ int64_t data_size;
+ /// The address of the data object.
+ uint8_t* data;
+ /// The metadata size in bytes.
+ int64_t metadata_size;
+ /// The address of the metadata.
+ uint8_t* metadata;
+};
+
+/// Configuration options for the plasma client.
+struct PlasmaClientConfig {
+ /// Number of release calls we wait until the object is actually released.
+ /// This allows us to avoid invalidating the cpu cache on workers if objects
+ /// are reused accross tasks.
+ size_t release_delay;
+};
+
+struct ClientMmapTableEntry {
+ /// The result of mmap for this file descriptor.
+ uint8_t* pointer;
+ /// The length of the memory-mapped file.
+ size_t length;
+ /// The number of objects in this memory-mapped file that are currently being
+ /// used by the client. When this count reaches zeros, we unmap the file.
+ int count;
+};
+
+struct ObjectInUseEntry {
+ /// A count of the number of times this client has called PlasmaClient::Create
+ /// or
+ /// PlasmaClient::Get on this object ID minus the number of calls to
+ /// PlasmaClient::Release.
+ /// When this count reaches zero, we remove the entry from the ObjectsInUse
+ /// and decrement a count in the relevant ClientMmapTableEntry.
+ int count;
+ /// Cached information to read the object.
+ PlasmaObject object;
+ /// A flag representing whether the object has been sealed.
+ bool is_sealed;
+};
+
+class PlasmaClient {
+ public:
+ /// Connect to the local plasma store and plasma manager. Return
+ /// the resulting connection.
+ ///
+ /// @param store_socket_name The name of the UNIX domain socket to use to
+ /// connect to the Plasma store.
+ /// @param manager_socket_name The name of the UNIX domain socket to use to
+ /// connect to the local Plasma manager. If this is "", then this
+ /// function will not connect to a manager.
+ /// @param release_delay Number of released objects that are kept around
+ /// and not evicted to avoid too many munmaps.
+ /// @return The return status.
+ Status Connect(const std::string& store_socket_name,
+ const std::string& manager_socket_name, int release_delay);
+
+ /// Create an object in the Plasma Store. Any metadata for this object must be
+ /// be passed in when the object is created.
+ ///
+ /// @param object_id The ID to use for the newly created object.
+ /// @param data_size The size in bytes of the space to be allocated for this
+ /// object's
+ /// data (this does not include space used for metadata).
+ /// @param metadata The object's metadata. If there is no metadata, this
+ /// pointer
+ /// should be NULL.
+ /// @param metadata_size The size in bytes of the metadata. If there is no
+ /// metadata, this should be 0.
+ /// @param data The address of the newly created object will be written here.
+ /// @return The return status.
+ Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
+ int64_t metadata_size, uint8_t** data);
+
+ /// Get some objects from the Plasma Store. This function will block until the
+ /// objects have all been created and sealed in the Plasma Store or the
+ /// timeout
+ /// expires. The caller is responsible for releasing any retrieved objects,
+ /// but
+ /// the caller should not release objects that were not retrieved.
+ ///
+ /// @param object_ids The IDs of the objects to get.
+ /// @param num_object_ids The number of object IDs to get.
+ /// @param timeout_ms The amount of time in milliseconds to wait before this
+ /// request times out. If this value is -1, then no timeout is set.
+ /// @param object_buffers An array where the results will be stored. If the
+ /// data
+ /// size field is -1, then the object was not retrieved.
+ /// @return The return status.
+ Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+ ObjectBuffer* object_buffers);
+
+ /// Tell Plasma that the client no longer needs the object. This should be
+ /// called
+ /// after Get when the client is done with the object. After this call,
+ /// the address returned by Get is no longer valid. This should be called
+ /// once for each call to Get (with the same object ID).
+ ///
+ /// @param object_id The ID of the object that is no longer needed.
+ /// @return The return status.
+ Status Release(const ObjectID& object_id);
+
+ /// Check if the object store contains a particular object and the object has
+ /// been sealed. The result will be stored in has_object.
+ ///
+ /// @todo: We may want to indicate if the object has been created but not
+ /// sealed.
+ ///
+ /// @param object_id The ID of the object whose presence we are checking.
+ /// @param has_object The function will write true at this address if
+ /// the object is present and false if it is not present.
+ /// @return The return status.
+ Status Contains(const ObjectID& object_id, bool* has_object);
+
+ /// Seal an object in the object store. The object will be immutable after
+ /// this
+ /// call.
+ ///
+ /// @param object_id The ID of the object to seal.
+ /// @return The return status.
+ Status Seal(const ObjectID& object_id);
+
+ /// Delete an object from the object store. This currently assumes that the
+ /// object is present and has been sealed.
+ ///
+ /// @todo We may want to allow the deletion of objects that are not present or
+ /// haven't been sealed.
+ ///
+ /// @param object_id The ID of the object to delete.
+ /// @return The return status.
+ Status Delete(const ObjectID& object_id);
+
+ /// Delete objects until we have freed up num_bytes bytes or there are no more
+ /// released objects that can be deleted.
+ ///
+ /// @param num_bytes The number of bytes to try to free up.
+ /// @param num_bytes_evicted Out parameter for total number of bytes of space
+ /// retrieved.
+ /// @return The return status.
+ Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
+
+ /// Subscribe to notifications when objects are sealed in the object store.
+ /// Whenever an object is sealed, a message will be written to the client
+ /// socket
+ /// that is returned by this method.
+ ///
+ /// @param fd Out parameter for the file descriptor the client should use to
+ /// read notifications
+ /// from the object store about sealed objects.
+ /// @return The return status.
+ Status Subscribe(int* fd);
+
+ /// Disconnect from the local plasma instance, including the local store and
+ /// manager.
+ ///
+ /// @return The return status.
+ Status Disconnect();
+
+ /// Attempt to initiate the transfer of some objects from remote Plasma
+ /// Stores.
+ /// This method does not guarantee that the fetched objects will arrive
+ /// locally.
+ ///
+ /// For an object that is available in the local Plasma Store, this method
+ /// will
+ /// not do anything. For an object that is not available locally, it will
+ /// check
+ /// if the object are already being fetched. If so, it will not do anything.
+ /// If
+ /// not, it will query the object table for a list of Plasma Managers that
+ /// have
+ /// the object. The object table will return a non-empty list, and this Plasma
+ /// Manager will attempt to initiate transfers from one of those Plasma
+ /// Managers.
+ ///
+ /// This function is non-blocking.
+ ///
+ /// This method is idempotent in the sense that it is ok to call it multiple
+ /// times.
+ ///
+ /// @param num_object_ids The number of object IDs fetch is being called on.
+ /// @param object_ids The IDs of the objects that fetch is being called on.
+ /// @return The return status.
+ Status Fetch(int num_object_ids, const ObjectID* object_ids);
+
+ /// Wait for (1) a specified number of objects to be available (sealed) in the
+ /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
+ /// expire. This is a blocking call.
+ ///
+ /// @param num_object_requests Size of the object_requests array.
+ /// @param object_requests Object event array. Each element contains a request
+ /// for a particular object_id. The type of request is specified in the
+ /// "type" field.
+ /// - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
+ /// available in the local Plasma Store. In this case, this function
+ /// sets the "status" field to ObjectStatus_Local. Note, if the
+ /// status
+ /// is not ObjectStatus_Local, it will be ObjectStatus_Nonexistent,
+ /// but it may exist elsewhere in the system.
+ /// - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
+ /// becomes
+ /// available either at the local Plasma Store or on a remote Plasma
+ /// Store. In this case, the functions sets the "status" field to
+ /// ObjectStatus_Local or ObjectStatus_Remote.
+ /// @param num_ready_objects The number of requests in object_requests array
+ /// that
+ /// must be satisfied before the function returns, unless it timeouts.
+ /// The num_ready_objects should be no larger than num_object_requests.
+ /// @param timeout_ms Timeout value in milliseconds. If this timeout expires
+ /// before min_num_ready_objects of requests are satisfied, the
+ /// function
+ /// returns.
+ /// @param num_objects_ready Out parameter for number of satisfied requests in
+ /// the object_requests list. If the returned number is less than
+ /// min_num_ready_objects this means that timeout expired.
+ /// @return The return status.
+ Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
+ int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+
+ /// Transfer local object to a different plasma manager.
+ ///
+ /// @param conn The object containing the connection state.
+ /// @param addr IP address of the plasma manager we are transfering to.
+ /// @param port Port of the plasma manager we are transfering to.
+ /// @object_id ObjectID of the object we are transfering.
+ /// @return The return status.
+ Status Transfer(const char* addr, int port, const ObjectID& object_id);
+
+ /// Return the status of a given object. This method may query the object
+ /// table.
+ ///
+ /// @param conn The object containing the connection state.
+ /// @param object_id The ID of the object whose status we query.
+ /// @param object_status Out parameter for object status. Can take the
+ /// following values.
+ /// - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
+ /// Store.
+ /// has been already scheduled by the Plasma Manager.
+ /// - PLASMA_CLIENT_TRANSFER, if the object is either currently being
+ /// transferred or just scheduled.
+ /// - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
+ /// Plasma Store.
+ /// - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the
+ /// system.
+ /// @return The return status.
+ Status Info(const ObjectID& object_id, int* object_status);
+
+ /// Get the file descriptor for the socket connection to the plasma manager.
+ ///
+ /// @param conn The plasma connection.
+ /// @return The file descriptor for the manager connection. If there is no
+ /// connection to the manager, this is -1.
+ int get_manager_fd();
+
+ private:
+ Status PerformRelease(const ObjectID& object_id);
+
+ uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
+
+ uint8_t* lookup_mmapped_file(int store_fd_val);
+
+ void increment_object_count(
+ const ObjectID& object_id, PlasmaObject* object, bool is_sealed);
+
+ /// File descriptor of the Unix domain socket that connects to the store.
+ int store_conn_;
+ /// File descriptor of the Unix domain socket that connects to the manager.
+ int manager_conn_;
+ /// Table of dlmalloc buffer files that have been memory mapped so far. This
+ /// is a hash table mapping a file descriptor to a struct containing the
+ /// address of the corresponding memory-mapped file.
+ std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
+ /// A hash table of the object IDs that are currently being used by this
+ /// client.
+ std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>, UniqueIDHasher>
+ objects_in_use_;
+ /// Object IDs of the last few release calls. This is a deque and
+ /// is used to delay releasing objects to see if they can be reused by
+ /// subsequent tasks so we do not unneccessarily invalidate cpu caches.
+ /// TODO(pcm): replace this with a proper lru cache using the size of the L3
+ /// cache.
+ std::deque<ObjectID> release_history_;
+ /// The number of bytes in the combined objects that are held in the release
+ /// history doubly-linked list. If this is too large then the client starts
+ /// releasing objects.
+ int64_t in_use_object_bytes_;
+ /// Configuration options for the plasma client.
+ PlasmaClientConfig config_;
+ /// The amount of memory available to the Plasma store. The client needs this
+ /// information to make sure that it does not delay in releasing so much
+ /// memory that the store is unable to evict enough objects to free up space.
+ int64_t store_capacity_;
+};
+
+/// Compute the hash of an object in the object store.
+///
+/// @param conn The object containing the connection state.
+/// @param object_id The ID of the object we want to hash.
+/// @param digest A pointer at which to return the hash digest of the object.
+/// The pointer must have at least DIGEST_SIZE bytes allocated.
+/// @return A boolean representing whether the hash operation succeeded.
+bool plasma_compute_object_hash(
+ PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
+
+#endif // PLASMA_CLIENT_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
new file mode 100644
index 0000000..a09a963
--- /dev/null
+++ b/cpp/src/plasma/common.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/common.h"
+
+#include <random>
+
+#include "format/plasma_generated.h"
+
+using arrow::Status;
+
+UniqueID UniqueID::from_random() {
+ UniqueID id;
+ uint8_t* data = id.mutable_data();
+ std::random_device engine;
+ for (int i = 0; i < kUniqueIDSize; i++) {
+ data[i] = static_cast<uint8_t>(engine());
+ }
+ return id;
+}
+
+UniqueID UniqueID::from_binary(const std::string& binary) {
+ UniqueID id;
+ std::memcpy(&id, binary.data(), sizeof(id));
+ return id;
+}
+
+const uint8_t* UniqueID::data() const {
+ return id_;
+}
+
+uint8_t* UniqueID::mutable_data() {
+ return id_;
+}
+
+std::string UniqueID::binary() const {
+ return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize);
+}
+
+std::string UniqueID::hex() const {
+ constexpr char hex[] = "0123456789abcdef";
+ std::string result;
+ for (int i = 0; i < kUniqueIDSize; i++) {
+ unsigned int val = id_[i];
+ result.push_back(hex[val >> 4]);
+ result.push_back(hex[val & 0xf]);
+ }
+ return result;
+}
+
+bool UniqueID::operator==(const UniqueID& rhs) const {
+ return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
+}
+
+Status plasma_error_status(int plasma_error) {
+ switch (plasma_error) {
+ case PlasmaError_OK:
+ return Status::OK();
+ case PlasmaError_ObjectExists:
+ return Status::PlasmaObjectExists("object already exists in the plasma store");
+ case PlasmaError_ObjectNonexistent:
+ return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
+ case PlasmaError_OutOfMemory:
+ return Status::PlasmaStoreFull("object does not fit in the plasma store");
+ default:
+ ARROW_LOG(FATAL) << "unknown plasma error code " << plasma_error;
+ }
+ return Status::OK();
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
new file mode 100644
index 0000000..85dc74b
--- /dev/null
+++ b/cpp/src/plasma/common.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_COMMON_H
+#define PLASMA_COMMON_H
+
+#include <cstring>
+#include <string>
+// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
+// and get rid of the next three lines:
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+constexpr int64_t kUniqueIDSize = 20;
+
+class UniqueID {
+ public:
+ static UniqueID from_random();
+ static UniqueID from_binary(const std::string& binary);
+ bool operator==(const UniqueID& rhs) const;
+ const uint8_t* data() const;
+ uint8_t* mutable_data();
+ std::string binary() const;
+ std::string hex() const;
+
+ private:
+ uint8_t id_[kUniqueIDSize];
+};
+
+static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
+
+struct UniqueIDHasher {
+ // ObjectID hashing function.
+ size_t operator()(const UniqueID& id) const {
+ size_t result;
+ std::memcpy(&result, id.data(), sizeof(size_t));
+ return result;
+ }
+};
+
+typedef UniqueID ObjectID;
+
+arrow::Status plasma_error_status(int plasma_error);
+
+#endif // PLASMA_COMMON_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
new file mode 100644
index 0000000..a9f7356
--- /dev/null
+++ b/cpp/src/plasma/events.cc
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/events.h"
+
+#include <errno.h>
+
+void EventLoop::file_event_callback(
+ aeEventLoop* loop, int fd, void* context, int events) {
+ FileCallback* callback = reinterpret_cast<FileCallback*>(context);
+ (*callback)(events);
+}
+
+int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context) {
+ TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
+ return (*callback)(timer_id);
+}
+
+constexpr int kInitialEventLoopSize = 1024;
+
+EventLoop::EventLoop() {
+ loop_ = aeCreateEventLoop(kInitialEventLoopSize);
+}
+
+bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) {
+ if (file_callbacks_.find(fd) != file_callbacks_.end()) { return false; }
+ auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
+ void* context = reinterpret_cast<void*>(data.get());
+ // Try to add the file descriptor.
+ int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+ // If it cannot be added, increase the size of the event loop.
+ if (err == AE_ERR && errno == ERANGE) {
+ err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
+ if (err != AE_OK) { return false; }
+ err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+ }
+ // In any case, test if there were errors.
+ if (err == AE_OK) {
+ file_callbacks_.emplace(fd, std::move(data));
+ return true;
+ }
+ return false;
+}
+
+void EventLoop::remove_file_event(int fd) {
+ aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
+ file_callbacks_.erase(fd);
+}
+
+void EventLoop::run() {
+ aeMain(loop_);
+}
+
+int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
+ auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
+ void* context = reinterpret_cast<void*>(data.get());
+ int64_t timer_id =
+ aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, context, NULL);
+ timer_callbacks_.emplace(timer_id, std::move(data));
+ return timer_id;
+}
+
+int EventLoop::remove_timer(int64_t timer_id) {
+ int err = aeDeleteTimeEvent(loop_, timer_id);
+ timer_callbacks_.erase(timer_id);
+ return err;
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
new file mode 100644
index 0000000..bd93d6b
--- /dev/null
+++ b/cpp/src/plasma/events.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVENTS
+#define PLASMA_EVENTS
+
+#include <functional>
+#include <memory>
+#include <unordered_map>
+
+extern "C" {
+#include "ae/ae.h"
+}
+
+/// Constant specifying that the timer is done and it will be removed.
+constexpr int kEventLoopTimerDone = AE_NOMORE;
+
+/// Read event on the file descriptor.
+constexpr int kEventLoopRead = AE_READABLE;
+
+/// Write event on the file descriptor.
+constexpr int kEventLoopWrite = AE_WRITABLE;
+
+typedef long long TimerID; // NOLINT
+
+class EventLoop {
+ public:
+ // Signature of the handler that will be called when there is a new event
+ // on the file descriptor that this handler has been registered for.
+ //
+ // The arguments are the event flags (read or write).
+ using FileCallback = std::function<void(int)>;
+
+ // This handler will be called when a timer times out. The timer id is
+ // passed as an argument. The return is the number of milliseconds the timer
+ // shall be reset to or kEventLoopTimerDone if the timer shall not be
+ // triggered again.
+ using TimerCallback = std::function<int(int64_t)>;
+
+ EventLoop();
+
+ /// Add a new file event handler to the event loop.
+ ///
+ /// @param fd The file descriptor we are listening to.
+ /// @param events The flags for events we are listening to (read or write).
+ /// @param callback The callback that will be called when the event happens.
+ /// @return Returns true if the event handler was added successfully.
+ bool add_file_event(int fd, int events, const FileCallback& callback);
+
+ /// Remove a file event handler from the event loop.
+ ///
+ /// @param fd The file descriptor of the event handler.
+ /// @return Void.
+ void remove_file_event(int fd);
+
+ /// Register a handler that will be called after a time slice of
+ /// "timeout" milliseconds.
+ ///
+ /// @param timeout The timeout in milliseconds.
+ /// @param callback The callback for the timeout.
+ /// @return The ID of the newly created timer.
+ int64_t add_timer(int64_t timeout, const TimerCallback& callback);
+
+ /// Remove a timer handler from the event loop.
+ ///
+ /// @param timer_id The ID of the timer that is to be removed.
+ /// @return The ae.c error code. TODO(pcm): needs to be standardized
+ int remove_timer(int64_t timer_id);
+
+ /// Run the event loop.
+ ///
+ /// @return Void.
+ void run();
+
+ private:
+ static void file_event_callback(aeEventLoop* loop, int fd, void* context, int events);
+
+ static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context);
+
+ aeEventLoop* loop_;
+ std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
+ std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
+};
+
+#endif // PLASMA_EVENTS
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
new file mode 100644
index 0000000..4ae6384
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/eviction_policy.h"
+
+#include <algorithm>
+
+void LRUCache::add(const ObjectID& key, int64_t size) {
+ auto it = item_map_.find(key);
+ ARROW_CHECK(it == item_map_.end());
+ /* Note that it is important to use a list so the iterators stay valid. */
+ item_list_.emplace_front(key, size);
+ item_map_.emplace(key, item_list_.begin());
+}
+
+void LRUCache::remove(const ObjectID& key) {
+ auto it = item_map_.find(key);
+ ARROW_CHECK(it != item_map_.end());
+ item_list_.erase(it->second);
+ item_map_.erase(it);
+}
+
+int64_t LRUCache::choose_objects_to_evict(
+ int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+ int64_t bytes_evicted = 0;
+ auto it = item_list_.end();
+ while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
+ it--;
+ objects_to_evict->push_back(it->first);
+ bytes_evicted += it->second;
+ }
+ return bytes_evicted;
+}
+
+EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info)
+ : memory_used_(0), store_info_(store_info) {}
+
+int64_t EvictionPolicy::choose_objects_to_evict(
+ int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+ int64_t bytes_evicted =
+ cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
+ /* Update the LRU cache. */
+ for (auto& object_id : *objects_to_evict) {
+ cache_.remove(object_id);
+ }
+ /* Update the number of bytes used. */
+ memory_used_ -= bytes_evicted;
+ return bytes_evicted;
+}
+
+void EvictionPolicy::object_created(const ObjectID& object_id) {
+ auto entry = store_info_->objects[object_id].get();
+ cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}
+
+bool EvictionPolicy::require_space(
+ int64_t size, std::vector<ObjectID>* objects_to_evict) {
+ /* Check if there is enough space to create the object. */
+ int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
+ int64_t num_bytes_evicted;
+ if (required_space > 0) {
+ /* Try to free up at least as much space as we need right now but ideally
+ * up to 20% of the total capacity. */
+ int64_t space_to_free = std::max(size, store_info_->memory_capacity / 5);
+ ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
+ /* Choose some objects to evict, and update the return pointers. */
+ num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict);
+ ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
+ << objects_to_evict->size() << " objects to free up "
+ << num_bytes_evicted << " bytes.";
+ } else {
+ num_bytes_evicted = 0;
+ }
+ if (num_bytes_evicted >= required_space) {
+ /* We only increment the space used if there is enough space to create the
+ * object. */
+ memory_used_ += size;
+ }
+ return num_bytes_evicted >= required_space;
+}
+
+void EvictionPolicy::begin_object_access(
+ const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+ /* If the object is in the LRU cache, remove it. */
+ cache_.remove(object_id);
+}
+
+void EvictionPolicy::end_object_access(
+ const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+ auto entry = store_info_->objects[object_id].get();
+ /* Add the object to the LRU cache.*/
+ cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
new file mode 100644
index 0000000..3815fc6
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.h
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVICTION_POLICY_H
+#define PLASMA_EVICTION_POLICY_H
+
+#include <list>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/plasma.h"
+
+// ==== The eviction policy ====
+//
+// This file contains declaration for all functions and data structures that
+// need to be provided if you want to implement a new eviction algorithm for the
+// Plasma store.
+
+class LRUCache {
+ public:
+ LRUCache() {}
+
+ void add(const ObjectID& key, int64_t size);
+
+ void remove(const ObjectID& key);
+
+ int64_t choose_objects_to_evict(
+ int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+ /// A doubly-linked list containing the items in the cache and
+ /// their sizes in LRU order.
+ typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
+ ItemList item_list_;
+ /// A hash table mapping the object ID of an object in the cache to its
+ /// location in the doubly linked list item_list_.
+ std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
+};
+
+/// The eviction policy.
+class EvictionPolicy {
+ public:
+ /// Construct an eviction policy.
+ ///
+ /// @param store_info Information about the Plasma store that is exposed
+ /// to the eviction policy.
+ explicit EvictionPolicy(PlasmaStoreInfo* store_info);
+
+ /// This method will be called whenever an object is first created in order to
+ /// add it to the LRU cache. This is done so that the first time, the Plasma
+ /// store calls begin_object_access, we can remove the object from the LRU
+ /// cache.
+ ///
+ /// @param object_id The object ID of the object that was created.
+ /// @return Void.
+ void object_created(const ObjectID& object_id);
+
+ /// This method will be called when the Plasma store needs more space, perhaps
+ /// to create a new object. If the required amount of space cannot be freed up,
+ /// then a fatal error will be thrown. When this method is called, the eviction
+ /// policy will assume that the objects chosen to be evicted will in fact be
+ /// evicted from the Plasma store by the caller.
+ ///
+ /// @param size The size in bytes of the new object, including both data and
+ /// metadata.
+ /// @param objects_to_evict The object IDs that were chosen for eviction will
+ /// be stored into this vector.
+ /// @return True if enough space can be freed and false otherwise.
+ bool require_space(int64_t size, std::vector<ObjectID>* objects_to_evict);
+
+ /// This method will be called whenever an unused object in the Plasma store
+ /// starts to be used. When this method is called, the eviction policy will
+ /// assume that the objects chosen to be evicted will in fact be evicted from
+ /// the Plasma store by the caller.
+ ///
+ /// @param object_id The ID of the object that is now being used.
+ /// @param objects_to_evict The object IDs that were chosen for eviction will
+ /// be stored into this vector.
+ /// @return Void.
+ void begin_object_access(
+ const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+ /// This method will be called whenever an object in the Plasma store that was
+ /// being used is no longer being used. When this method is called, the
+ /// eviction policy will assume that the objects chosen to be evicted will in
+ /// fact be evicted from the Plasma store by the caller.
+ ///
+ /// @param object_id The ID of the object that is no longer being used.
+ /// @param objects_to_evict The object IDs that were chosen for eviction will
+ /// be stored into this vector.
+ /// @return Void.
+ void end_object_access(
+ const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+ /// Choose some objects to evict from the Plasma store. When this method is
+ /// called, the eviction policy will assume that the objects chosen to be
+ /// evicted will in fact be evicted from the Plasma store by the caller.
+ ///
+ /// @note This method is not part of the API. It is exposed in the header file
+ /// only for testing.
+ ///
+ /// @param num_bytes_required The number of bytes of space to try to free up.
+ /// @param objects_to_evict The object IDs that were chosen for eviction will
+ /// be stored into this vector.
+ /// @return The total number of bytes of space chosen to be evicted.
+ int64_t choose_objects_to_evict(
+ int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+ /// The amount of memory (in bytes) currently being used.
+ int64_t memory_used_;
+ /// Pointer to the plasma store info.
+ PlasmaStoreInfo* store_info_;
+ /// Datastructure for the LRU cache.
+ LRUCache cache_;
+};
+
+#endif // PLASMA_EVICTION_POLICY_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/extension.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc
new file mode 100644
index 0000000..5d61e33
--- /dev/null
+++ b/cpp/src/plasma/extension.cc
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/extension.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/protocol.h"
+
+PyObject* PlasmaOutOfMemoryError;
+PyObject* PlasmaObjectExistsError;
+
+PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
+ const char* store_socket_name;
+ const char* manager_socket_name;
+ int release_delay;
+ if (!PyArg_ParseTuple(
+ args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) {
+ return NULL;
+ }
+ PlasmaClient* client = new PlasmaClient();
+ ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay));
+
+ return PyCapsule_New(client, "plasma", NULL);
+}
+
+PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
+ PyObject* client_capsule;
+ if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
+ PlasmaClient* client;
+ ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
+ ARROW_CHECK_OK(client->Disconnect());
+ /* We use the context of the connection capsule to indicate if the connection
+ * is still active (if the context is NULL) or if it is closed (if the context
+ * is (void*) 0x1). This is neccessary because the primary pointer of the
+ * capsule cannot be NULL. */
+ PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ Py_ssize_t size;
+ PyObject* metadata;
+ if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
+ PyStringToUniqueID, &object_id, &size, &metadata)) {
+ return NULL;
+ }
+ if (!PyByteArray_Check(metadata)) {
+ PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
+ return NULL;
+ }
+ uint8_t* data;
+ Status s = client->Create(object_id, size,
+ reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
+ PyByteArray_Size(metadata), &data);
+ if (s.IsPlasmaObjectExists()) {
+ PyErr_SetString(PlasmaObjectExistsError,
+ "An object with this ID already exists in the plasma "
+ "store.");
+ return NULL;
+ }
+ if (s.IsPlasmaStoreFull()) {
+ PyErr_SetString(PlasmaOutOfMemoryError,
+ "The plasma store ran out of memory and could not create "
+ "this object.");
+ return NULL;
+ }
+ ARROW_CHECK(s.ok());
+
+#if PY_MAJOR_VERSION >= 3
+ return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE);
+#else
+ return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
+#endif
+}
+
+PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+ &object_id)) {
+ return NULL;
+ }
+ unsigned char digest[kDigestSize];
+ bool success = plasma_compute_object_hash(client, object_id, digest);
+ if (success) {
+ PyObject* digest_string =
+ PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize);
+ return digest_string;
+ } else {
+ Py_RETURN_NONE;
+ }
+}
+
+PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+ &object_id)) {
+ return NULL;
+ }
+ ARROW_CHECK_OK(client->Seal(object_id));
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+ &object_id)) {
+ return NULL;
+ }
+ ARROW_CHECK_OK(client->Release(object_id));
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ PyObject* object_id_list;
+ Py_ssize_t timeout_ms;
+ if (!PyArg_ParseTuple(
+ args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) {
+ return NULL;
+ }
+
+ Py_ssize_t num_object_ids = PyList_Size(object_id_list);
+ std::vector<ObjectID> object_ids(num_object_ids);
+ std::vector<ObjectBuffer> object_buffers(num_object_ids);
+
+ for (int i = 0; i < num_object_ids; ++i) {
+ PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+ }
+
+ Py_BEGIN_ALLOW_THREADS;
+ ARROW_CHECK_OK(
+ client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data()));
+ Py_END_ALLOW_THREADS;
+
+ PyObject* returns = PyList_New(num_object_ids);
+ for (int i = 0; i < num_object_ids; ++i) {
+ if (object_buffers[i].data_size != -1) {
+ /* The object was retrieved, so return the object. */
+ PyObject* t = PyTuple_New(2);
+ Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size);
+ Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size);
+#if PY_MAJOR_VERSION >= 3
+ char* data = reinterpret_cast<char*>(object_buffers[i].data);
+ char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
+ PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ));
+ PyTuple_SET_ITEM(
+ t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
+#else
+ void* data = reinterpret_cast<void*>(object_buffers[i].data);
+ void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
+ PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
+ PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
+#endif
+ ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
+ } else {
+ /* The object was not retrieved, so just add None to the list of return
+ * values. */
+ Py_INCREF(Py_None);
+ ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
+ }
+ }
+ return returns;
+}
+
+PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+ &object_id)) {
+ return NULL;
+ }
+ bool has_object;
+ ARROW_CHECK_OK(client->Contains(object_id, &has_object));
+
+ if (has_object) {
+ Py_RETURN_TRUE;
+ } else {
+ Py_RETURN_FALSE;
+ }
+}
+
+PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ PyObject* object_id_list;
+ if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) {
+ return NULL;
+ }
+ if (client->get_manager_fd() == -1) {
+ PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+ return NULL;
+ }
+ Py_ssize_t n = PyList_Size(object_id_list);
+ ObjectID* object_ids = new ObjectID[n];
+ for (int i = 0; i < n; ++i) {
+ PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+ }
+ ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
+ delete[] object_ids;
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ PyObject* object_id_list;
+ Py_ssize_t timeout;
+ int num_returns;
+ if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list,
+ &timeout, &num_returns)) {
+ return NULL;
+ }
+ Py_ssize_t n = PyList_Size(object_id_list);
+
+ if (client->get_manager_fd() == -1) {
+ PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+ return NULL;
+ }
+ if (num_returns < 0) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "The argument num_returns cannot be less than zero.");
+ return NULL;
+ }
+ if (num_returns > n) {
+ PyErr_SetString(PyExc_RuntimeError,
+ "The argument num_returns cannot be greater than len(object_ids)");
+ return NULL;
+ }
+ int64_t threshold = 1 << 30;
+ if (timeout > threshold) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30.");
+ return NULL;
+ }
+
+ std::vector<ObjectRequest> object_requests(n);
+ for (int i = 0; i < n; ++i) {
+ ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
+ &object_requests[i].object_id) == 1);
+ object_requests[i].type = PLASMA_QUERY_ANYWHERE;
+ }
+ /* Drop the global interpreter lock while we are waiting, so other threads can
+ * run. */
+ int num_return_objects;
+ Py_BEGIN_ALLOW_THREADS;
+ ARROW_CHECK_OK(
+ client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects));
+ Py_END_ALLOW_THREADS;
+
+ int num_to_return = std::min(num_return_objects, num_returns);
+ PyObject* ready_ids = PyList_New(num_to_return);
+ PyObject* waiting_ids = PySet_New(object_id_list);
+ int num_returned = 0;
+ for (int i = 0; i < n; ++i) {
+ if (num_returned == num_to_return) { break; }
+ if (object_requests[i].status == ObjectStatus_Local ||
+ object_requests[i].status == ObjectStatus_Remote) {
+ PyObject* ready = PyBytes_FromStringAndSize(
+ reinterpret_cast<char*>(&object_requests[i].object_id),
+ sizeof(object_requests[i].object_id));
+ PyList_SetItem(ready_ids, num_returned, ready);
+ PySet_Discard(waiting_ids, ready);
+ num_returned += 1;
+ } else {
+ ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
+ }
+ }
+ ARROW_CHECK(num_returned == num_to_return);
+ /* Return both the ready IDs and the remaining IDs. */
+ PyObject* t = PyTuple_New(2);
+ PyTuple_SetItem(t, 0, ready_ids);
+ PyTuple_SetItem(t, 1, waiting_ids);
+ return t;
+}
+
+PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ Py_ssize_t num_bytes;
+ if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) {
+ return NULL;
+ }
+ int64_t evicted_bytes;
+ ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
+ return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
+}
+
+PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+ &object_id)) {
+ return NULL;
+ }
+ ARROW_CHECK_OK(client->Delete(object_id));
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ ObjectID object_id;
+ const char* addr;
+ int port;
+ if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
+ PyStringToUniqueID, &object_id, &addr, &port)) {
+ return NULL;
+ }
+
+ if (client->get_manager_fd() == -1) {
+ PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+ return NULL;
+ }
+
+ ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
+ Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
+ PlasmaClient* client;
+ if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; }
+
+ int sock;
+ ARROW_CHECK_OK(client->Subscribe(&sock));
+ return PyLong_FromLong(sock);
+}
+
+PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
+ int plasma_sock;
+
+ if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
+ /* Receive object notification from the plasma connection socket. If the
+ * object was added, return a tuple of its fields: ObjectID, data_size,
+ * metadata_size. If the object was deleted, data_size and metadata_size will
+ * be set to -1. */
+ uint8_t* notification = read_message_async(plasma_sock);
+ if (notification == NULL) {
+ PyErr_SetString(
+ PyExc_RuntimeError, "Failed to read object notification from Plasma socket");
+ return NULL;
+ }
+ auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+ /* Construct a tuple from object_info and return. */
+ PyObject* t = PyTuple_New(3);
+ PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(),
+ object_info->object_id()->size()));
+ if (object_info->is_deletion()) {
+ PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
+ PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
+ } else {
+ PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
+ PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
+ }
+
+ delete[] notification;
+ return t;
+}
+
+static PyMethodDef plasma_methods[] = {
+ {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
+ {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."},
+ {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
+ {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."},
+ {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
+ {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
+ {"contains", PyPlasma_contains, METH_VARARGS,
+ "Does the plasma store contain this plasma object?"},
+ {"fetch", PyPlasma_fetch, METH_VARARGS,
+ "Fetch the object from another plasma manager instance."},
+ {"wait", PyPlasma_wait, METH_VARARGS,
+ "Wait until num_returns objects in object_ids are ready."},
+ {"evict", PyPlasma_evict, METH_VARARGS,
+ "Evict some objects until we recover some number of bytes."},
+ {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
+ {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
+ {"transfer", PyPlasma_transfer, METH_VARARGS,
+ "Transfer object to another plasma manager."},
+ {"subscribe", PyPlasma_subscribe, METH_VARARGS,
+ "Subscribe to the plasma notification socket."},
+ {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
+ "Receive next notification from plasma notification socket."},
+ {NULL} /* Sentinel */
+};
+
+#if PY_MAJOR_VERSION >= 3
+static struct PyModuleDef moduledef = {
+ PyModuleDef_HEAD_INIT, "libplasma", /* m_name */
+ "A Python client library for plasma.", /* m_doc */
+ 0, /* m_size */
+ plasma_methods, /* m_methods */
+ NULL, /* m_reload */
+ NULL, /* m_traverse */
+ NULL, /* m_clear */
+ NULL, /* m_free */
+};
+#endif
+
+#if PY_MAJOR_VERSION >= 3
+#define INITERROR return NULL
+#else
+#define INITERROR return
+#endif
+
+#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
+#define PyMODINIT_FUNC void
+#endif
+
+#if PY_MAJOR_VERSION >= 3
+#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
+#else
+#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
+#endif
+
+MOD_INIT(libplasma) {
+#if PY_MAJOR_VERSION >= 3
+ PyObject* m = PyModule_Create(&moduledef);
+#else
+ PyObject* m =
+ Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma.");
+#endif
+
+ /* Create a custom exception for when an object ID is reused. */
+ char plasma_object_exists_error[] = "plasma_object_exists.error";
+ PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL);
+ Py_INCREF(PlasmaObjectExistsError);
+ PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
+ /* Create a custom exception for when the plasma store is out of memory. */
+ char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
+ PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
+ Py_INCREF(PlasmaOutOfMemoryError);
+ PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
+
+#if PY_MAJOR_VERSION >= 3
+ return m;
+#endif
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/extension.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
new file mode 100644
index 0000000..cee30ab
--- /dev/null
+++ b/cpp/src/plasma/extension.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EXTENSION_H
+#define PLASMA_EXTENSION_H
+
+#undef _XOPEN_SOURCE
+#undef _POSIX_C_SOURCE
+#include <Python.h>
+
+#include "bytesobject.h" // NOLINT
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+
+static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
+ if (PyCapsule_IsValid(object, "plasma")) {
+ *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma"));
+ return 1;
+ } else {
+ PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
+ return 0;
+ }
+}
+
+int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
+ if (PyBytes_Check(object)) {
+ memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
+ return 1;
+ } else {
+ PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
+ return 0;
+ }
+}
+
+#endif // PLASMA_EXTENSION_H