You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/23 18:39:21 UTC

[14/14] arrow git commit: [C++] Restore Plasma source tree after 0.5.0 release

[C++] Restore Plasma source tree after 0.5.0 release

This reverts commit 62ef2cd8a39fc93e7fa4bb790d7cd92adb77571f.


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/2c810151
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/2c810151
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/2c810151

Branch: refs/heads/master
Commit: 2c8101515e2a0ab515c03f82dc84b02ca6c466da
Parents: 9b26ed8
Author: Wes McKinney <we...@twosigma.com>
Authored: Sun Jul 23 14:37:54 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Sun Jul 23 14:37:54 2017 -0400

----------------------------------------------------------------------
 cpp/src/plasma/CMakeLists.txt              |  113 +
 cpp/src/plasma/client.cc                   |  557 ++
 cpp/src/plasma/client.h                    |  343 ++
 cpp/src/plasma/common.cc                   |   83 +
 cpp/src/plasma/common.h                    |   63 +
 cpp/src/plasma/events.cc                   |   81 +
 cpp/src/plasma/events.h                    |   99 +
 cpp/src/plasma/eviction_policy.cc          |  107 +
 cpp/src/plasma/eviction_policy.h           |  134 +
 cpp/src/plasma/extension.cc                |  456 ++
 cpp/src/plasma/extension.h                 |   50 +
 cpp/src/plasma/fling.cc                    |   90 +
 cpp/src/plasma/fling.h                     |   52 +
 cpp/src/plasma/format/.gitignore           |    1 +
 cpp/src/plasma/format/common.fbs           |   34 +
 cpp/src/plasma/format/plasma.fbs           |  291 ++
 cpp/src/plasma/io.cc                       |  212 +
 cpp/src/plasma/io.h                        |   55 +
 cpp/src/plasma/malloc.cc                   |  178 +
 cpp/src/plasma/malloc.h                    |   26 +
 cpp/src/plasma/plasma.cc                   |   64 +
 cpp/src/plasma/plasma.h                    |  191 +
 cpp/src/plasma/protocol.cc                 |  502 ++
 cpp/src/plasma/protocol.h                  |  170 +
 cpp/src/plasma/store.cc                    |  683 +++
 cpp/src/plasma/store.h                     |  169 +
 cpp/src/plasma/test/client_tests.cc        |  132 +
 cpp/src/plasma/test/run_tests.sh           |   61 +
 cpp/src/plasma/test/run_valgrind.sh        |   27 +
 cpp/src/plasma/test/serialization_tests.cc |  388 ++
 cpp/src/plasma/thirdparty/ae/ae.c          |  465 ++
 cpp/src/plasma/thirdparty/ae/ae.h          |  123 +
 cpp/src/plasma/thirdparty/ae/ae_epoll.c    |  135 +
 cpp/src/plasma/thirdparty/ae/ae_evport.c   |  320 ++
 cpp/src/plasma/thirdparty/ae/ae_kqueue.c   |  138 +
 cpp/src/plasma/thirdparty/ae/ae_select.c   |  106 +
 cpp/src/plasma/thirdparty/ae/config.h      |   54 +
 cpp/src/plasma/thirdparty/ae/zmalloc.h     |   45 +
 cpp/src/plasma/thirdparty/dlmalloc.c       | 6281 +++++++++++++++++++++++
 cpp/src/plasma/thirdparty/xxhash.cc        |  889 ++++
 cpp/src/plasma/thirdparty/xxhash.h         |  293 ++
 41 files changed, 14261 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
new file mode 100644
index 0000000..4ff3beb
--- /dev/null
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+cmake_minimum_required(VERSION 2.8)
+
+project(plasma)
+
+find_package(PythonLibsNew REQUIRED)
+find_package(Threads)
+
+option(PLASMA_PYTHON
+  "Build the Plasma Python extensions"
+  OFF)
+
+if(APPLE)
+  SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
+endif(APPLE)
+
+include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
+include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -D_XOPEN_SOURCE=500 -D_POSIX_C_SOURCE=200809L")
+
+set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
+
+# Compile flatbuffers
+
+set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
+set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
+
+set(PLASMA_FBS_OUTPUT_FILES
+  "${OUTPUT_DIR}/common_generated.h"
+  "${OUTPUT_DIR}/plasma_generated.h")
+
+add_custom_target(gen_plasma_fbs DEPENDS ${PLASMA_FBS_OUTPUT_FILES})
+
+if(FLATBUFFERS_VENDORED)
+  add_dependencies(gen_plasma_fbs flatbuffers_ep)
+endif()
+
+add_custom_command(
+  OUTPUT ${PLASMA_FBS_OUTPUT_FILES}
+  # The --gen-object-api flag generates a C++ class MessageT for each
+  # flatbuffers message Message, which can be used to store deserialized
+  # messages in data structures. This is currently used for ObjectInfo for
+  # example.
+  COMMAND ${FLATBUFFERS_COMPILER} -c -o ${OUTPUT_DIR} ${PLASMA_FBS_SRC} --gen-object-api
+  DEPENDS ${PLASMA_FBS_SRC}
+  COMMENT "Running flatc compiler on ${PLASMA_FBS_SRC}"
+  VERBATIM)
+
+if(UNIX AND NOT APPLE)
+  link_libraries(rt)
+endif()
+
+set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
+
+set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)
+
+set(PLASMA_SRCS
+  client.cc
+  common.cc
+  eviction_policy.cc
+  events.cc
+  fling.cc
+  io.cc
+  malloc.cc
+  plasma.cc
+  protocol.cc
+  thirdparty/ae/ae.c
+  thirdparty/xxhash.cc)
+
+ADD_ARROW_LIB(plasma
+  SOURCES ${PLASMA_SRCS}
+  DEPENDENCIES gen_plasma_fbs
+  SHARED_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static
+  STATIC_LINK_LIBS ${FLATBUFFERS_STATIC_LIB} ${CMAKE_THREAD_LIBS_INIT} arrow_static)
+
+# The optimization flag -O3 is suggested by dlmalloc.c, which is #included in
+# malloc.cc; we set it here regardless of whether we do a debug or release build.
+set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
+
+add_executable(plasma_store store.cc)
+target_link_libraries(plasma_store plasma_static)
+
+ADD_ARROW_TEST(test/serialization_tests)
+ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
+ADD_ARROW_TEST(test/client_tests)
+ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
+
+if(PLASMA_PYTHON)
+  add_library(plasma_extension SHARED extension.cc)
+
+  if(APPLE)
+    target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
+  else(APPLE)
+    target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
+  endif(APPLE)
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
new file mode 100644
index 0000000..dcb78e7
--- /dev/null
+++ b/cpp/src/plasma/client.cc
@@ -0,0 +1,557 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA CLIENT: Client library for using the plasma store and manager
+
+#include "plasma/client.h"
+
+#ifdef _WIN32
+#include <Win32_Interop/win32_types.h>
+#endif
+
+#include <assert.h>
+#include <fcntl.h>
+#include <netinet/in.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <strings.h>
+#include <sys/ioctl.h>
+#include <sys/mman.h>
+#include <sys/socket.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <algorithm>
+#include <thread>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+#define XXH_STATIC_LINKING_ONLY
+#include "thirdparty/xxhash.h"
+
+#define XXH64_DEFAULT_SEED 0
+
+// Number of threads used for memcopy and hash computations.
+constexpr int64_t kThreadPoolSize = 8;
+constexpr int64_t kBytesInMB = 1 << 20;
+static std::vector<std::thread> threadpool_(kThreadPoolSize);
+
+// If the file descriptor fd has been mmapped in this client process before,
+// return the pointer that was returned by mmap, otherwise mmap it and store the
+// pointer in a hash table.
+uint8_t* PlasmaClient::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) {
+  auto entry = mmap_table_.find(store_fd_val);
+  if (entry != mmap_table_.end()) {
+    close(fd);
+    return entry->second.pointer;
+  } else {
+    uint8_t* result = reinterpret_cast<uint8_t*>(
+        mmap(NULL, map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
+    // TODO(pcm): Don't fail here, instead return a Status.
+    if (result == MAP_FAILED) { ARROW_LOG(FATAL) << "mmap failed"; }
+    close(fd);
+    ClientMmapTableEntry& entry = mmap_table_[store_fd_val];
+    entry.pointer = result;
+    entry.length = map_size;
+    entry.count = 0;
+    return result;
+  }
+}
+
+// Get a pointer to a file that we know has been memory mapped in this client
+// process before.
+uint8_t* PlasmaClient::lookup_mmapped_file(int store_fd_val) {
+  auto entry = mmap_table_.find(store_fd_val);
+  ARROW_CHECK(entry != mmap_table_.end());
+  return entry->second.pointer;
+}
+
+void PlasmaClient::increment_object_count(
+    const ObjectID& object_id, PlasmaObject* object, bool is_sealed) {
+  // Increment the count of the object to track the fact that it is being used.
+  // The corresponding decrement should happen in PlasmaClient::Release.
+  auto elem = objects_in_use_.find(object_id);
+  ObjectInUseEntry* object_entry;
+  if (elem == objects_in_use_.end()) {
+    // Add this object ID to the hash table of object IDs in use. The
+    // corresponding call to free happens in PlasmaClient::Release.
+    objects_in_use_[object_id] =
+        std::unique_ptr<ObjectInUseEntry>(new ObjectInUseEntry());
+    objects_in_use_[object_id]->object = *object;
+    objects_in_use_[object_id]->count = 0;
+    objects_in_use_[object_id]->is_sealed = is_sealed;
+    object_entry = objects_in_use_[object_id].get();
+    // Increment the count of the number of objects in the memory-mapped file
+    // that are being used. The corresponding decrement should happen in
+    // PlasmaClient::Release.
+    auto entry = mmap_table_.find(object->handle.store_fd);
+    ARROW_CHECK(entry != mmap_table_.end());
+    ARROW_CHECK(entry->second.count >= 0);
+    // Update the in_use_object_bytes_.
+    in_use_object_bytes_ +=
+        (object_entry->object.data_size + object_entry->object.metadata_size);
+    entry->second.count += 1;
+  } else {
+    object_entry = elem->second.get();
+    ARROW_CHECK(object_entry->count > 0);
+  }
+  // Increment the count of the number of instances of this object that are
+  // being used by this client. The corresponding decrement should happen in
+  // PlasmaClient::Release.
+  object_entry->count += 1;
+}
+
+Status PlasmaClient::Create(const ObjectID& object_id, int64_t data_size,
+    uint8_t* metadata, int64_t metadata_size, uint8_t** data) {
+  ARROW_LOG(DEBUG) << "called plasma_create on conn " << store_conn_ << " with size "
+                   << data_size << " and metadata size " << metadata_size;
+  RETURN_NOT_OK(SendCreateRequest(store_conn_, object_id, data_size, metadata_size));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaCreateReply, &buffer));
+  ObjectID id;
+  PlasmaObject object;
+  RETURN_NOT_OK(ReadCreateReply(buffer.data(), &id, &object));
+  // If the CreateReply included an error, then the store will not send a file
+  // descriptor.
+  int fd = recv_fd(store_conn_);
+  ARROW_CHECK(fd >= 0) << "recv not successful";
+  ARROW_CHECK(object.data_size == data_size);
+  ARROW_CHECK(object.metadata_size == metadata_size);
+  // The metadata should come right after the data.
+  ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
+  *data = lookup_or_mmap(fd, object.handle.store_fd, object.handle.mmap_size) +
+          object.data_offset;
+  // If plasma_create is being called from a transfer, then we will not copy the
+  // metadata here. The metadata will be written along with the data streamed
+  // from the transfer.
+  if (metadata != NULL) {
+    // Copy the metadata to the buffer.
+    memcpy(*data + object.data_size, metadata, metadata_size);
+  }
+  // Increment the count of the number of instances of this object that this
+  // client is using. A call to PlasmaClient::Release is required to decrement
+  // this
+  // count. Cache the reference to the object.
+  increment_object_count(object_id, &object, false);
+  // We increment the count a second time (and the corresponding decrement will
+  // happen in a PlasmaClient::Release call in plasma_seal) so even if the
+  // buffer
+  // returned by PlasmaClient::Dreate goes out of scope, the object does not get
+  // released before the call to PlasmaClient::Seal happens.
+  increment_object_count(object_id, &object, false);
+  return Status::OK();
+}
+
+Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
+    int64_t timeout_ms, ObjectBuffer* object_buffers) {
+  // Fill out the info for the objects that are already in use locally.
+  bool all_present = true;
+  for (int i = 0; i < num_objects; ++i) {
+    auto object_entry = objects_in_use_.find(object_ids[i]);
+    if (object_entry == objects_in_use_.end()) {
+      // This object is not currently in use by this client, so we need to send
+      // a request to the store.
+      all_present = false;
+      // Make a note to ourselves that the object is not present.
+      object_buffers[i].data_size = -1;
+    } else {
+      // NOTE: If the object is still unsealed, we will deadlock, since we must
+      // have been the one who created it.
+      ARROW_CHECK(object_entry->second->is_sealed)
+          << "Plasma client called get on an unsealed object that it created";
+      PlasmaObject* object = &object_entry->second->object;
+      object_buffers[i].data = lookup_mmapped_file(object->handle.store_fd);
+      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data_size = object->data_size;
+      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+      object_buffers[i].metadata_size = object->metadata_size;
+      // Increment the count of the number of instances of this object that this
+      // client is using. A call to PlasmaClient::Release is required to
+      // decrement this
+      // count. Cache the reference to the object.
+      increment_object_count(object_ids[i], object, true);
+    }
+  }
+
+  if (all_present) { return Status::OK(); }
+
+  // If we get here, then the objects aren't all currently in use by this
+  // client, so we need to send a request to the plasma store.
+  RETURN_NOT_OK(SendGetRequest(store_conn_, object_ids, num_objects, timeout_ms));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaGetReply, &buffer));
+  std::vector<ObjectID> received_object_ids(num_objects);
+  std::vector<PlasmaObject> object_data(num_objects);
+  PlasmaObject* object;
+  RETURN_NOT_OK(ReadGetReply(
+      buffer.data(), received_object_ids.data(), object_data.data(), num_objects));
+
+  for (int i = 0; i < num_objects; ++i) {
+    DCHECK(received_object_ids[i] == object_ids[i]);
+    object = &object_data[i];
+    if (object_buffers[i].data_size != -1) {
+      // If the object was already in use by the client, then the store should
+      // have returned it.
+      DCHECK_NE(object->data_size, -1);
+      // We won't use this file descriptor, but the store sent us one, so we
+      // need to receive it and then close it right away so we don't leak file
+      // descriptors.
+      int fd = recv_fd(store_conn_);
+      close(fd);
+      ARROW_CHECK(fd >= 0);
+      // We've already filled out the information for this object, so we can
+      // just continue.
+      continue;
+    }
+    // If we are here, the object was not currently in use, so we need to
+    // process the reply from the object store.
+    if (object->data_size != -1) {
+      // The object was retrieved. The user will be responsible for releasing
+      // this object.
+      int fd = recv_fd(store_conn_);
+      ARROW_CHECK(fd >= 0);
+      object_buffers[i].data =
+          lookup_or_mmap(fd, object->handle.store_fd, object->handle.mmap_size);
+      // Finish filling out the return values.
+      object_buffers[i].data = object_buffers[i].data + object->data_offset;
+      object_buffers[i].data_size = object->data_size;
+      object_buffers[i].metadata = object_buffers[i].data + object->data_size;
+      object_buffers[i].metadata_size = object->metadata_size;
+      // Increment the count of the number of instances of this object that this
+      // client is using. A call to PlasmaClient::Release is required to
+      // decrement this
+      // count. Cache the reference to the object.
+      increment_object_count(received_object_ids[i], object, true);
+    } else {
+      // The object was not retrieved. Make sure we already put a -1 here to
+      // indicate that the object was not retrieved. The caller is not
+      // responsible for releasing this object.
+      DCHECK_EQ(object_buffers[i].data_size, -1);
+      object_buffers[i].data_size = -1;
+    }
+  }
+  return Status::OK();
+}
+
+/// This is a helper method for implementing plasma_release. We maintain a
+/// buffer
+/// of release calls and only perform them once the buffer becomes full (as
+/// judged by the aggregate sizes of the objects). There may be multiple release
+/// calls for the same object ID in the buffer. In this case, the first release
+/// calls will not do anything. The client will only send a message to the store
+/// releasing the object when the client is truly done with the object.
+///
+/// @param conn The plasma connection.
+/// @param object_id The object ID to attempt to release.
+Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
+  // Decrement the count of the number of instances of this object that are
+  // being used by this client. The corresponding increment should have happened
+  // in PlasmaClient::Get.
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end());
+  object_entry->second->count -= 1;
+  ARROW_CHECK(object_entry->second->count >= 0);
+  // Check if the client is no longer using this object.
+  if (object_entry->second->count == 0) {
+    // Decrement the count of the number of objects in this memory-mapped file
+    // that the client is using. The corresponding increment should have
+    // happened in plasma_get.
+    int fd = object_entry->second->object.handle.store_fd;
+    auto entry = mmap_table_.find(fd);
+    ARROW_CHECK(entry != mmap_table_.end());
+    entry->second.count -= 1;
+    ARROW_CHECK(entry->second.count >= 0);
+    // If none are being used then unmap the file.
+    if (entry->second.count == 0) {
+      munmap(entry->second.pointer, entry->second.length);
+      // Remove the corresponding entry from the hash table.
+      mmap_table_.erase(fd);
+    }
+    // Tell the store that the client no longer needs the object.
+    RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
+    // Update the in_use_object_bytes_.
+    in_use_object_bytes_ -= (object_entry->second->object.data_size +
+                             object_entry->second->object.metadata_size);
+    DCHECK_GE(in_use_object_bytes_, 0);
+    // Remove the entry from the hash table of objects currently in use.
+    objects_in_use_.erase(object_id);
+  }
+  return Status::OK();
+}
+
+Status PlasmaClient::Release(const ObjectID& object_id) {
+  // Add the new object to the release history.
+  release_history_.push_front(object_id);
+  // If there are too many bytes in use by the client or if there are too many
+  // pending release calls, and there are at least some pending release calls in
+  // the release_history list, then release some objects.
+  while ((in_use_object_bytes_ > std::min(kL3CacheSizeBytes, store_capacity_ / 100) ||
+             release_history_.size() > config_.release_delay) &&
+         release_history_.size() > 0) {
+    // Perform a release for the object ID for the first pending release.
+    RETURN_NOT_OK(PerformRelease(release_history_.back()));
+    // Remove the last entry from the release history.
+    release_history_.pop_back();
+  }
+  return Status::OK();
+}
+
+// This method is used to query whether the plasma store contains an object.
+Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
+  // Check if we already have a reference to the object.
+  if (objects_in_use_.count(object_id) > 0) {
+    *has_object = 1;
+  } else {
+    // If we don't already have a reference to the object, check with the store
+    // to see if we have the object.
+    RETURN_NOT_OK(SendContainsRequest(store_conn_, object_id));
+    std::vector<uint8_t> buffer;
+    RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaContainsReply, &buffer));
+    ObjectID object_id2;
+    RETURN_NOT_OK(ReadContainsReply(buffer.data(), &object_id2, has_object));
+  }
+  return Status::OK();
+}
+
+static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t* hash) {
+  XXH64_state_t hash_state;
+  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+  XXH64_update(&hash_state, data, nbytes);
+  *hash = XXH64_digest(&hash_state);
+}
+
+static inline bool compute_object_hash_parallel(
+    XXH64_state_t* hash_state, const unsigned char* data, int64_t nbytes) {
+  // Note that this function will likely be faster if the address of data is
+  // aligned on a 64-byte boundary.
+  const int num_threads = kThreadPoolSize;
+  uint64_t threadhash[num_threads + 1];
+  const uint64_t data_address = reinterpret_cast<uint64_t>(data);
+  const uint64_t num_blocks = nbytes / BLOCK_SIZE;
+  const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
+  const uint64_t right_address = data_address + chunk_size * num_threads;
+  const uint64_t suffix = (data_address + nbytes) - right_address;
+  // Now the data layout is | k * num_threads * block_size | suffix | ==
+  // | num_threads * chunk_size | suffix |, where chunk_size = k * block_size.
+  // Each thread gets a "chunk" of k blocks, except the suffix thread.
+
+  for (int i = 0; i < num_threads; i++) {
+    threadpool_[i] = std::thread(ComputeBlockHash,
+        reinterpret_cast<uint8_t*>(data_address) + i * chunk_size, chunk_size,
+        &threadhash[i]);
+  }
+  ComputeBlockHash(
+      reinterpret_cast<uint8_t*>(right_address), suffix, &threadhash[num_threads]);
+
+  // Join the threads.
+  for (auto& t : threadpool_) {
+    if (t.joinable()) { t.join(); }
+  }
+
+  XXH64_update(hash_state, (unsigned char*)threadhash, sizeof(threadhash));
+  return true;
+}
+
+static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
+  XXH64_state_t hash_state;
+  XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
+  if (obj_buffer.data_size >= kBytesInMB) {
+    compute_object_hash_parallel(
+        &hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
+  } else {
+    XXH64_update(&hash_state, (unsigned char*)obj_buffer.data, obj_buffer.data_size);
+  }
+  XXH64_update(
+      &hash_state, (unsigned char*)obj_buffer.metadata, obj_buffer.metadata_size);
+  return XXH64_digest(&hash_state);
+}
+
+bool plasma_compute_object_hash(
+    PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
+  // Get the plasma object data. We pass in a timeout of 0 to indicate that
+  // the operation should timeout immediately.
+  ObjectBuffer object_buffer;
+  ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
+  // If the object was not retrieved, return false.
+  if (object_buffer.data_size == -1) { return false; }
+  // Compute the hash.
+  uint64_t hash = compute_object_hash(object_buffer);
+  memcpy(digest, &hash, sizeof(hash));
+  // Release the plasma object.
+  ARROW_CHECK_OK(conn->Release(object_id));
+  return true;
+}
+
+Status PlasmaClient::Seal(const ObjectID& object_id) {
+  // Make sure this client has a reference to the object before sending the
+  // request to Plasma.
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end())
+      << "Plasma client called seal an object without a reference to it";
+  ARROW_CHECK(!object_entry->second->is_sealed)
+      << "Plasma client called seal an already sealed object";
+  object_entry->second->is_sealed = true;
+  /// Send the seal request to Plasma.
+  static unsigned char digest[kDigestSize];
+  ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
+  RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
+  // We call PlasmaClient::Release to decrement the number of instances of this
+  // object
+  // that are currently being used by this client. The corresponding increment
+  // happened in plasma_create and was used to ensure that the object was not
+  // released before the call to PlasmaClient::Seal.
+  return Release(object_id);
+}
+
+Status PlasmaClient::Delete(const ObjectID& object_id) {
+  // TODO(rkn): In the future, we can use this method to give hints to the
+  // eviction policy about when an object will no longer be needed.
+  return Status::NotImplemented("PlasmaClient::Delete is not implemented.");
+}
+
+Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
+  // Send a request to the store to evict objects.
+  RETURN_NOT_OK(SendEvictRequest(store_conn_, num_bytes));
+  // Wait for a response with the number of bytes actually evicted.
+  std::vector<uint8_t> buffer;
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+  return ReadEvictReply(buffer.data(), num_bytes_evicted);
+}
+
+Status PlasmaClient::Subscribe(int* fd) {
+  int sock[2];
+  // Create a non-blocking socket pair. This will only be used to send
+  // notifications from the Plasma store to the client.
+  socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
+  // Make the socket non-blocking.
+  int flags = fcntl(sock[1], F_GETFL, 0);
+  ARROW_CHECK(fcntl(sock[1], F_SETFL, flags | O_NONBLOCK) == 0);
+  // Tell the Plasma store about the subscription.
+  RETURN_NOT_OK(SendSubscribeRequest(store_conn_));
+  // Send the file descriptor that the Plasma store should use to push
+  // notifications about sealed objects to this client.
+  ARROW_CHECK(send_fd(store_conn_, sock[1]) >= 0);
+  close(sock[1]);
+  // Return the file descriptor that the client should use to read notifications
+  // about sealed objects.
+  *fd = sock[0];
+  return Status::OK();
+}
+
+Status PlasmaClient::Connect(const std::string& store_socket_name,
+    const std::string& manager_socket_name, int release_delay) {
+  store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
+  if (manager_socket_name != "") {
+    manager_conn_ = connect_ipc_sock_retry(manager_socket_name, -1, -1);
+  } else {
+    manager_conn_ = -1;
+  }
+  config_.release_delay = release_delay;
+  in_use_object_bytes_ = 0;
+  // Send a ConnectRequest to the store to get its memory capacity.
+  RETURN_NOT_OK(SendConnectRequest(store_conn_));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(store_conn_, MessageType_PlasmaConnectReply, &buffer));
+  RETURN_NOT_OK(ReadConnectReply(buffer.data(), &store_capacity_));
+  return Status::OK();
+}
+
+Status PlasmaClient::Disconnect() {
+  // NOTE: We purposefully do not finish sending release calls for objects in
+  // use, so that we don't duplicate PlasmaClient::Release calls (when handling
+  // a SIGTERM, for example).
+
+  // Close the connections to Plasma. The Plasma store will release the objects
+  // that were in use by us when handling the SIGPIPE.
+  close(store_conn_);
+  if (manager_conn_ >= 0) { close(manager_conn_); }
+  return Status::OK();
+}
+
+#define h_addr h_addr_list[0]
+
+Status PlasmaClient::Transfer(const char* address, int port, const ObjectID& object_id) {
+  return SendDataRequest(manager_conn_, object_id, address, port);
+}
+
+Status PlasmaClient::Fetch(int num_object_ids, const ObjectID* object_ids) {
+  ARROW_CHECK(manager_conn_ >= 0);
+  return SendFetchRequest(manager_conn_, object_ids, num_object_ids);
+}
+
+int PlasmaClient::get_manager_fd() {
+  return manager_conn_;
+}
+
+Status PlasmaClient::Info(const ObjectID& object_id, int* object_status) {
+  ARROW_CHECK(manager_conn_ >= 0);
+
+  RETURN_NOT_OK(SendStatusRequest(manager_conn_, &object_id, 1));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaStatusReply, &buffer));
+  ObjectID id;
+  RETURN_NOT_OK(ReadStatusReply(buffer.data(), &id, object_status, 1));
+  ARROW_CHECK(object_id == id);
+  return Status::OK();
+}
+
+Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_requests,
+    int num_ready_objects, int64_t timeout_ms, int* num_objects_ready) {
+  ARROW_CHECK(manager_conn_ >= 0);
+  ARROW_CHECK(num_object_requests > 0);
+  ARROW_CHECK(num_ready_objects > 0);
+  ARROW_CHECK(num_ready_objects <= num_object_requests);
+
+  for (int i = 0; i < num_object_requests; ++i) {
+    ARROW_CHECK(object_requests[i].type == PLASMA_QUERY_LOCAL ||
+                object_requests[i].type == PLASMA_QUERY_ANYWHERE);
+  }
+
+  RETURN_NOT_OK(SendWaitRequest(manager_conn_, object_requests, num_object_requests,
+      num_ready_objects, timeout_ms));
+  std::vector<uint8_t> buffer;
+  RETURN_NOT_OK(PlasmaReceive(manager_conn_, MessageType_PlasmaWaitReply, &buffer));
+  RETURN_NOT_OK(ReadWaitReply(buffer.data(), object_requests, &num_ready_objects));
+
+  *num_objects_ready = 0;
+  for (int i = 0; i < num_object_requests; ++i) {
+    int type = object_requests[i].type;
+    int status = object_requests[i].status;
+    switch (type) {
+      case PLASMA_QUERY_LOCAL:
+        if (status == ObjectStatus_Local) { *num_objects_ready += 1; }
+        break;
+      case PLASMA_QUERY_ANYWHERE:
+        if (status == ObjectStatus_Local || status == ObjectStatus_Remote) {
+          *num_objects_ready += 1;
+        } else {
+          ARROW_CHECK(status == ObjectStatus_Nonexistent);
+        }
+        break;
+      default:
+        ARROW_LOG(FATAL) << "This code should be unreachable.";
+    }
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
new file mode 100644
index 0000000..fb3a161
--- /dev/null
+++ b/cpp/src/plasma/client.h
@@ -0,0 +1,343 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_CLIENT_H
+#define PLASMA_CLIENT_H
+
+#include <stdbool.h>
+#include <time.h>
+
+#include <deque>
+#include <string>
+
+#include "plasma/plasma.h"
+
+using arrow::Status;
+
+#define PLASMA_DEFAULT_RELEASE_DELAY 64
+
+// Use 100MB as an overestimate of the L3 cache size.
+constexpr int64_t kL3CacheSizeBytes = 100000000;
+
+/// Object buffer data structure.
+struct ObjectBuffer {
+  /// The size in bytes of the data object.
+  int64_t data_size;
+  /// The address of the data object.
+  uint8_t* data;
+  /// The metadata size in bytes.
+  int64_t metadata_size;
+  /// The address of the metadata.
+  uint8_t* metadata;
+};
+
+/// Configuration options for the plasma client.
+struct PlasmaClientConfig {
+  /// Number of release calls we wait until the object is actually released.
+  /// This allows us to avoid invalidating the cpu cache on workers if objects
+  /// are reused accross tasks.
+  size_t release_delay;
+};
+
+struct ClientMmapTableEntry {
+  /// The result of mmap for this file descriptor.
+  uint8_t* pointer;
+  /// The length of the memory-mapped file.
+  size_t length;
+  /// The number of objects in this memory-mapped file that are currently being
+  /// used by the client. When this count reaches zeros, we unmap the file.
+  int count;
+};
+
+struct ObjectInUseEntry {
+  /// A count of the number of times this client has called PlasmaClient::Create
+  /// or
+  /// PlasmaClient::Get on this object ID minus the number of calls to
+  /// PlasmaClient::Release.
+  /// When this count reaches zero, we remove the entry from the ObjectsInUse
+  /// and decrement a count in the relevant ClientMmapTableEntry.
+  int count;
+  /// Cached information to read the object.
+  PlasmaObject object;
+  /// A flag representing whether the object has been sealed.
+  bool is_sealed;
+};
+
+class PlasmaClient {
+ public:
+  /// Connect to the local plasma store and plasma manager. Return
+  /// the resulting connection.
+  ///
+  /// @param store_socket_name The name of the UNIX domain socket to use to
+  ///        connect to the Plasma store.
+  /// @param manager_socket_name The name of the UNIX domain socket to use to
+  ///        connect to the local Plasma manager. If this is "", then this
+  ///        function will not connect to a manager.
+  /// @param release_delay Number of released objects that are kept around
+  ///        and not evicted to avoid too many munmaps.
+  /// @return The return status.
+  Status Connect(const std::string& store_socket_name,
+      const std::string& manager_socket_name, int release_delay);
+
+  /// Create an object in the Plasma Store. Any metadata for this object must be
+  /// be passed in when the object is created.
+  ///
+  /// @param object_id The ID to use for the newly created object.
+  /// @param data_size The size in bytes of the space to be allocated for this
+  /// object's
+  ///        data (this does not include space used for metadata).
+  /// @param metadata The object's metadata. If there is no metadata, this
+  /// pointer
+  ///        should be NULL.
+  /// @param metadata_size The size in bytes of the metadata. If there is no
+  ///        metadata, this should be 0.
+  /// @param data The address of the newly created object will be written here.
+  /// @return The return status.
+  Status Create(const ObjectID& object_id, int64_t data_size, uint8_t* metadata,
+      int64_t metadata_size, uint8_t** data);
+
+  /// Get some objects from the Plasma Store. This function will block until the
+  /// objects have all been created and sealed in the Plasma Store or the
+  /// timeout
+  /// expires. The caller is responsible for releasing any retrieved objects,
+  /// but
+  /// the caller should not release objects that were not retrieved.
+  ///
+  /// @param object_ids The IDs of the objects to get.
+  /// @param num_object_ids The number of object IDs to get.
+  /// @param timeout_ms The amount of time in milliseconds to wait before this
+  ///        request times out. If this value is -1, then no timeout is set.
+  /// @param object_buffers An array where the results will be stored. If the
+  /// data
+  ///        size field is -1, then the object was not retrieved.
+  /// @return The return status.
+  Status Get(const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms,
+      ObjectBuffer* object_buffers);
+
+  /// Tell Plasma that the client no longer needs the object. This should be
+  /// called
+  /// after Get when the client is done with the object. After this call,
+  /// the address returned by Get is no longer valid. This should be called
+  /// once for each call to Get (with the same object ID).
+  ///
+  /// @param object_id The ID of the object that is no longer needed.
+  /// @return The return status.
+  Status Release(const ObjectID& object_id);
+
+  /// Check if the object store contains a particular object and the object has
+  /// been sealed. The result will be stored in has_object.
+  ///
+  /// @todo: We may want to indicate if the object has been created but not
+  /// sealed.
+  ///
+  /// @param object_id The ID of the object whose presence we are checking.
+  /// @param has_object The function will write true at this address if
+  ///        the object is present and false if it is not present.
+  /// @return The return status.
+  Status Contains(const ObjectID& object_id, bool* has_object);
+
+  /// Seal an object in the object store. The object will be immutable after
+  /// this
+  /// call.
+  ///
+  /// @param object_id The ID of the object to seal.
+  /// @return The return status.
+  Status Seal(const ObjectID& object_id);
+
+  /// Delete an object from the object store. This currently assumes that the
+  /// object is present and has been sealed.
+  ///
+  /// @todo We may want to allow the deletion of objects that are not present or
+  ///       haven't been sealed.
+  ///
+  /// @param object_id The ID of the object to delete.
+  /// @return The return status.
+  Status Delete(const ObjectID& object_id);
+
+  /// Delete objects until we have freed up num_bytes bytes or there are no more
+  /// released objects that can be deleted.
+  ///
+  /// @param num_bytes The number of bytes to try to free up.
+  /// @param num_bytes_evicted Out parameter for total number of bytes of space
+  /// retrieved.
+  /// @return The return status.
+  Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
+
+  /// Subscribe to notifications when objects are sealed in the object store.
+  /// Whenever an object is sealed, a message will be written to the client
+  /// socket
+  /// that is returned by this method.
+  ///
+  /// @param fd Out parameter for the file descriptor the client should use to
+  /// read notifications
+  ///         from the object store about sealed objects.
+  /// @return The return status.
+  Status Subscribe(int* fd);
+
+  /// Disconnect from the local plasma instance, including the local store and
+  /// manager.
+  ///
+  /// @return The return status.
+  Status Disconnect();
+
+  /// Attempt to initiate the transfer of some objects from remote Plasma
+  /// Stores.
+  /// This method does not guarantee that the fetched objects will arrive
+  /// locally.
+  ///
+  /// For an object that is available in the local Plasma Store, this method
+  /// will
+  /// not do anything. For an object that is not available locally, it will
+  /// check
+  /// if the object are already being fetched. If so, it will not do anything.
+  /// If
+  /// not, it will query the object table for a list of Plasma Managers that
+  /// have
+  /// the object. The object table will return a non-empty list, and this Plasma
+  /// Manager will attempt to initiate transfers from one of those Plasma
+  /// Managers.
+  ///
+  /// This function is non-blocking.
+  ///
+  /// This method is idempotent in the sense that it is ok to call it multiple
+  /// times.
+  ///
+  /// @param num_object_ids The number of object IDs fetch is being called on.
+  /// @param object_ids The IDs of the objects that fetch is being called on.
+  /// @return The return status.
+  Status Fetch(int num_object_ids, const ObjectID* object_ids);
+
+  /// Wait for (1) a specified number of objects to be available (sealed) in the
+  /// local Plasma Store or in a remote Plasma Store, or (2) for a timeout to
+  /// expire. This is a blocking call.
+  ///
+  /// @param num_object_requests Size of the object_requests array.
+  /// @param object_requests Object event array. Each element contains a request
+  ///        for a particular object_id. The type of request is specified in the
+  ///        "type" field.
+  ///        - A PLASMA_QUERY_LOCAL request is satisfied when object_id becomes
+  ///          available in the local Plasma Store. In this case, this function
+  ///          sets the "status" field to ObjectStatus_Local. Note, if the
+  ///          status
+  ///          is not ObjectStatus_Local, it will be ObjectStatus_Nonexistent,
+  ///          but it may exist elsewhere in the system.
+  ///        - A PLASMA_QUERY_ANYWHERE request is satisfied when object_id
+  ///        becomes
+  ///          available either at the local Plasma Store or on a remote Plasma
+  ///          Store. In this case, the functions sets the "status" field to
+  ///          ObjectStatus_Local or ObjectStatus_Remote.
+  /// @param num_ready_objects The number of requests in object_requests array
+  /// that
+  ///        must be satisfied before the function returns, unless it timeouts.
+  ///        The num_ready_objects should be no larger than num_object_requests.
+  /// @param timeout_ms Timeout value in milliseconds. If this timeout expires
+  ///        before min_num_ready_objects of requests are satisfied, the
+  ///        function
+  ///        returns.
+  /// @param num_objects_ready Out parameter for number of satisfied requests in
+  ///        the object_requests list. If the returned number is less than
+  ///        min_num_ready_objects this means that timeout expired.
+  /// @return The return status.
+  Status Wait(int64_t num_object_requests, ObjectRequest* object_requests,
+      int num_ready_objects, int64_t timeout_ms, int* num_objects_ready);
+
+  /// Transfer local object to a different plasma manager.
+  ///
+  /// @param conn The object containing the connection state.
+  /// @param addr IP address of the plasma manager we are transfering to.
+  /// @param port Port of the plasma manager we are transfering to.
+  /// @object_id ObjectID of the object we are transfering.
+  /// @return The return status.
+  Status Transfer(const char* addr, int port, const ObjectID& object_id);
+
+  /// Return the status of a given object. This method may query the object
+  /// table.
+  ///
+  /// @param conn The object containing the connection state.
+  /// @param object_id The ID of the object whose status we query.
+  /// @param object_status Out parameter for object status. Can take the
+  ///         following values.
+  ///         - PLASMA_CLIENT_LOCAL, if object is stored in the local Plasma
+  ///         Store.
+  ///           has been already scheduled by the Plasma Manager.
+  ///         - PLASMA_CLIENT_TRANSFER, if the object is either currently being
+  ///           transferred or just scheduled.
+  ///         - PLASMA_CLIENT_REMOTE, if the object is stored at a remote
+  ///           Plasma Store.
+  ///         - PLASMA_CLIENT_DOES_NOT_EXIST, if the object doesn’t exist in the
+  ///           system.
+  /// @return The return status.
+  Status Info(const ObjectID& object_id, int* object_status);
+
+  /// Get the file descriptor for the socket connection to the plasma manager.
+  ///
+  /// @param conn The plasma connection.
+  /// @return The file descriptor for the manager connection. If there is no
+  ///         connection to the manager, this is -1.
+  int get_manager_fd();
+
+ private:
+  Status PerformRelease(const ObjectID& object_id);
+
+  uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
+
+  uint8_t* lookup_mmapped_file(int store_fd_val);
+
+  void increment_object_count(
+      const ObjectID& object_id, PlasmaObject* object, bool is_sealed);
+
+  /// File descriptor of the Unix domain socket that connects to the store.
+  int store_conn_;
+  /// File descriptor of the Unix domain socket that connects to the manager.
+  int manager_conn_;
+  /// Table of dlmalloc buffer files that have been memory mapped so far. This
+  /// is a hash table mapping a file descriptor to a struct containing the
+  /// address of the corresponding memory-mapped file.
+  std::unordered_map<int, ClientMmapTableEntry> mmap_table_;
+  /// A hash table of the object IDs that are currently being used by this
+  /// client.
+  std::unordered_map<ObjectID, std::unique_ptr<ObjectInUseEntry>, UniqueIDHasher>
+      objects_in_use_;
+  /// Object IDs of the last few release calls. This is a deque and
+  /// is used to delay releasing objects to see if they can be reused by
+  /// subsequent tasks so we do not unneccessarily invalidate cpu caches.
+  /// TODO(pcm): replace this with a proper lru cache using the size of the L3
+  /// cache.
+  std::deque<ObjectID> release_history_;
+  /// The number of bytes in the combined objects that are held in the release
+  /// history doubly-linked list. If this is too large then the client starts
+  /// releasing objects.
+  int64_t in_use_object_bytes_;
+  /// Configuration options for the plasma client.
+  PlasmaClientConfig config_;
+  /// The amount of memory available to the Plasma store. The client needs this
+  /// information to make sure that it does not delay in releasing so much
+  /// memory that the store is unable to evict enough objects to free up space.
+  int64_t store_capacity_;
+};
+
+/// Compute the hash of an object in the object store.
+///
+/// @param conn The object containing the connection state.
+/// @param object_id The ID of the object we want to hash.
+/// @param digest A pointer at which to return the hash digest of the object.
+///        The pointer must have at least DIGEST_SIZE bytes allocated.
+/// @return A boolean representing whether the hash operation succeeded.
+bool plasma_compute_object_hash(
+    PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
+
+#endif  // PLASMA_CLIENT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
new file mode 100644
index 0000000..a09a963
--- /dev/null
+++ b/cpp/src/plasma/common.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/common.h"
+
+#include <random>
+
+#include "format/plasma_generated.h"
+
+using arrow::Status;
+
+UniqueID UniqueID::from_random() {
+  UniqueID id;
+  uint8_t* data = id.mutable_data();
+  std::random_device engine;
+  for (int i = 0; i < kUniqueIDSize; i++) {
+    data[i] = static_cast<uint8_t>(engine());
+  }
+  return id;
+}
+
+UniqueID UniqueID::from_binary(const std::string& binary) {
+  UniqueID id;
+  std::memcpy(&id, binary.data(), sizeof(id));
+  return id;
+}
+
+const uint8_t* UniqueID::data() const {
+  return id_;
+}
+
+uint8_t* UniqueID::mutable_data() {
+  return id_;
+}
+
+std::string UniqueID::binary() const {
+  return std::string(reinterpret_cast<const char*>(id_), kUniqueIDSize);
+}
+
+std::string UniqueID::hex() const {
+  constexpr char hex[] = "0123456789abcdef";
+  std::string result;
+  for (int i = 0; i < kUniqueIDSize; i++) {
+    unsigned int val = id_[i];
+    result.push_back(hex[val >> 4]);
+    result.push_back(hex[val & 0xf]);
+  }
+  return result;
+}
+
+bool UniqueID::operator==(const UniqueID& rhs) const {
+  return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
+}
+
+Status plasma_error_status(int plasma_error) {
+  switch (plasma_error) {
+    case PlasmaError_OK:
+      return Status::OK();
+    case PlasmaError_ObjectExists:
+      return Status::PlasmaObjectExists("object already exists in the plasma store");
+    case PlasmaError_ObjectNonexistent:
+      return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
+    case PlasmaError_OutOfMemory:
+      return Status::PlasmaStoreFull("object does not fit in the plasma store");
+    default:
+      ARROW_LOG(FATAL) << "unknown plasma error code " << plasma_error;
+  }
+  return Status::OK();
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
new file mode 100644
index 0000000..85dc74b
--- /dev/null
+++ b/cpp/src/plasma/common.h
@@ -0,0 +1,63 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_COMMON_H
+#define PLASMA_COMMON_H
+
+#include <cstring>
+#include <string>
+// TODO(pcm): Convert getopt and sscanf in the store to use more idiomatic C++
+// and get rid of the next three lines:
+#ifndef __STDC_FORMAT_MACROS
+#define __STDC_FORMAT_MACROS
+#endif
+
+#include "arrow/status.h"
+#include "arrow/util/logging.h"
+
+constexpr int64_t kUniqueIDSize = 20;
+
+class UniqueID {
+ public:
+  static UniqueID from_random();
+  static UniqueID from_binary(const std::string& binary);
+  bool operator==(const UniqueID& rhs) const;
+  const uint8_t* data() const;
+  uint8_t* mutable_data();
+  std::string binary() const;
+  std::string hex() const;
+
+ private:
+  uint8_t id_[kUniqueIDSize];
+};
+
+static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
+
+struct UniqueIDHasher {
+  // ObjectID hashing function.
+  size_t operator()(const UniqueID& id) const {
+    size_t result;
+    std::memcpy(&result, id.data(), sizeof(size_t));
+    return result;
+  }
+};
+
+typedef UniqueID ObjectID;
+
+arrow::Status plasma_error_status(int plasma_error);
+
+#endif  // PLASMA_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
new file mode 100644
index 0000000..a9f7356
--- /dev/null
+++ b/cpp/src/plasma/events.cc
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/events.h"
+
+#include <errno.h>
+
+void EventLoop::file_event_callback(
+    aeEventLoop* loop, int fd, void* context, int events) {
+  FileCallback* callback = reinterpret_cast<FileCallback*>(context);
+  (*callback)(events);
+}
+
+int EventLoop::timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context) {
+  TimerCallback* callback = reinterpret_cast<TimerCallback*>(context);
+  return (*callback)(timer_id);
+}
+
+constexpr int kInitialEventLoopSize = 1024;
+
+EventLoop::EventLoop() {
+  loop_ = aeCreateEventLoop(kInitialEventLoopSize);
+}
+
+bool EventLoop::add_file_event(int fd, int events, const FileCallback& callback) {
+  if (file_callbacks_.find(fd) != file_callbacks_.end()) { return false; }
+  auto data = std::unique_ptr<FileCallback>(new FileCallback(callback));
+  void* context = reinterpret_cast<void*>(data.get());
+  // Try to add the file descriptor.
+  int err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+  // If it cannot be added, increase the size of the event loop.
+  if (err == AE_ERR && errno == ERANGE) {
+    err = aeResizeSetSize(loop_, 3 * aeGetSetSize(loop_) / 2);
+    if (err != AE_OK) { return false; }
+    err = aeCreateFileEvent(loop_, fd, events, EventLoop::file_event_callback, context);
+  }
+  // In any case, test if there were errors.
+  if (err == AE_OK) {
+    file_callbacks_.emplace(fd, std::move(data));
+    return true;
+  }
+  return false;
+}
+
+void EventLoop::remove_file_event(int fd) {
+  aeDeleteFileEvent(loop_, fd, AE_READABLE | AE_WRITABLE);
+  file_callbacks_.erase(fd);
+}
+
+void EventLoop::run() {
+  aeMain(loop_);
+}
+
+int64_t EventLoop::add_timer(int64_t timeout, const TimerCallback& callback) {
+  auto data = std::unique_ptr<TimerCallback>(new TimerCallback(callback));
+  void* context = reinterpret_cast<void*>(data.get());
+  int64_t timer_id =
+      aeCreateTimeEvent(loop_, timeout, EventLoop::timer_event_callback, context, NULL);
+  timer_callbacks_.emplace(timer_id, std::move(data));
+  return timer_id;
+}
+
+int EventLoop::remove_timer(int64_t timer_id) {
+  int err = aeDeleteTimeEvent(loop_, timer_id);
+  timer_callbacks_.erase(timer_id);
+  return err;
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
new file mode 100644
index 0000000..bd93d6b
--- /dev/null
+++ b/cpp/src/plasma/events.h
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVENTS
+#define PLASMA_EVENTS
+
+#include <functional>
+#include <memory>
+#include <unordered_map>
+
+extern "C" {
+#include "ae/ae.h"
+}
+
+/// Constant specifying that the timer is done and it will be removed.
+constexpr int kEventLoopTimerDone = AE_NOMORE;
+
+/// Read event on the file descriptor.
+constexpr int kEventLoopRead = AE_READABLE;
+
+/// Write event on the file descriptor.
+constexpr int kEventLoopWrite = AE_WRITABLE;
+
+typedef long long TimerID;  // NOLINT
+
+class EventLoop {
+ public:
+  // Signature of the handler that will be called when there is a new event
+  // on the file descriptor that this handler has been registered for.
+  //
+  // The arguments are the event flags (read or write).
+  using FileCallback = std::function<void(int)>;
+
+  // This handler will be called when a timer times out. The timer id is
+  // passed as an argument. The return is the number of milliseconds the timer
+  // shall be reset to or kEventLoopTimerDone if the timer shall not be
+  // triggered again.
+  using TimerCallback = std::function<int(int64_t)>;
+
+  EventLoop();
+
+  /// Add a new file event handler to the event loop.
+  ///
+  /// @param fd The file descriptor we are listening to.
+  /// @param events The flags for events we are listening to (read or write).
+  /// @param callback The callback that will be called when the event happens.
+  /// @return Returns true if the event handler was added successfully.
+  bool add_file_event(int fd, int events, const FileCallback& callback);
+
+  /// Remove a file event handler from the event loop.
+  ///
+  /// @param fd The file descriptor of the event handler.
+  /// @return Void.
+  void remove_file_event(int fd);
+
+  /// Register a handler that will be called after a time slice of
+  ///  "timeout" milliseconds.
+  ///
+  ///  @param timeout The timeout in milliseconds.
+  ///  @param callback The callback for the timeout.
+  ///  @return The ID of the newly created timer.
+  int64_t add_timer(int64_t timeout, const TimerCallback& callback);
+
+  /// Remove a timer handler from the event loop.
+  ///
+  /// @param timer_id The ID of the timer that is to be removed.
+  /// @return The ae.c error code. TODO(pcm): needs to be standardized
+  int remove_timer(int64_t timer_id);
+
+  /// Run the event loop.
+  ///
+  /// @return Void.
+  void run();
+
+ private:
+  static void file_event_callback(aeEventLoop* loop, int fd, void* context, int events);
+
+  static int timer_event_callback(aeEventLoop* loop, TimerID timer_id, void* context);
+
+  aeEventLoop* loop_;
+  std::unordered_map<int, std::unique_ptr<FileCallback>> file_callbacks_;
+  std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
+};
+
+#endif  // PLASMA_EVENTS

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
new file mode 100644
index 0000000..4ae6384
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/eviction_policy.h"
+
+#include <algorithm>
+
+void LRUCache::add(const ObjectID& key, int64_t size) {
+  auto it = item_map_.find(key);
+  ARROW_CHECK(it == item_map_.end());
+  /* Note that it is important to use a list so the iterators stay valid. */
+  item_list_.emplace_front(key, size);
+  item_map_.emplace(key, item_list_.begin());
+}
+
+void LRUCache::remove(const ObjectID& key) {
+  auto it = item_map_.find(key);
+  ARROW_CHECK(it != item_map_.end());
+  item_list_.erase(it->second);
+  item_map_.erase(it);
+}
+
+int64_t LRUCache::choose_objects_to_evict(
+    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+  int64_t bytes_evicted = 0;
+  auto it = item_list_.end();
+  while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
+    it--;
+    objects_to_evict->push_back(it->first);
+    bytes_evicted += it->second;
+  }
+  return bytes_evicted;
+}
+
+EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info)
+    : memory_used_(0), store_info_(store_info) {}
+
+int64_t EvictionPolicy::choose_objects_to_evict(
+    int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict) {
+  int64_t bytes_evicted =
+      cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
+  /* Update the LRU cache. */
+  for (auto& object_id : *objects_to_evict) {
+    cache_.remove(object_id);
+  }
+  /* Update the number of bytes used. */
+  memory_used_ -= bytes_evicted;
+  return bytes_evicted;
+}
+
+void EvictionPolicy::object_created(const ObjectID& object_id) {
+  auto entry = store_info_->objects[object_id].get();
+  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}
+
+bool EvictionPolicy::require_space(
+    int64_t size, std::vector<ObjectID>* objects_to_evict) {
+  /* Check if there is enough space to create the object. */
+  int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
+  int64_t num_bytes_evicted;
+  if (required_space > 0) {
+    /* Try to free up at least as much space as we need right now but ideally
+     * up to 20% of the total capacity. */
+    int64_t space_to_free = std::max(size, store_info_->memory_capacity / 5);
+    ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
+    /* Choose some objects to evict, and update the return pointers. */
+    num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict);
+    ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
+                    << objects_to_evict->size() << " objects to free up "
+                    << num_bytes_evicted << " bytes.";
+  } else {
+    num_bytes_evicted = 0;
+  }
+  if (num_bytes_evicted >= required_space) {
+    /* We only increment the space used if there is enough space to create the
+     * object. */
+    memory_used_ += size;
+  }
+  return num_bytes_evicted >= required_space;
+}
+
+void EvictionPolicy::begin_object_access(
+    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+  /* If the object is in the LRU cache, remove it. */
+  cache_.remove(object_id);
+}
+
+void EvictionPolicy::end_object_access(
+    const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict) {
+  auto entry = store_info_->objects[object_id].get();
+  /* Add the object to the LRU cache.*/
+  cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
new file mode 100644
index 0000000..3815fc6
--- /dev/null
+++ b/cpp/src/plasma/eviction_policy.h
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EVICTION_POLICY_H
+#define PLASMA_EVICTION_POLICY_H
+
+#include <list>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/plasma.h"
+
+// ==== The eviction policy ====
+//
+// This file contains declaration for all functions and data structures that
+// need to be provided if you want to implement a new eviction algorithm for the
+// Plasma store.
+
+class LRUCache {
+ public:
+  LRUCache() {}
+
+  void add(const ObjectID& key, int64_t size);
+
+  void remove(const ObjectID& key);
+
+  int64_t choose_objects_to_evict(
+      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+  /// A doubly-linked list containing the items in the cache and
+  /// their sizes in LRU order.
+  typedef std::list<std::pair<ObjectID, int64_t>> ItemList;
+  ItemList item_list_;
+  /// A hash table mapping the object ID of an object in the cache to its
+  /// location in the doubly linked list item_list_.
+  std::unordered_map<ObjectID, ItemList::iterator, UniqueIDHasher> item_map_;
+};
+
+/// The eviction policy.
+class EvictionPolicy {
+ public:
+  /// Construct an eviction policy.
+  ///
+  /// @param store_info Information about the Plasma store that is exposed
+  ///        to the eviction policy.
+  explicit EvictionPolicy(PlasmaStoreInfo* store_info);
+
+  /// This method will be called whenever an object is first created in order to
+  /// add it to the LRU cache. This is done so that the first time, the Plasma
+  /// store calls begin_object_access, we can remove the object from the LRU
+  /// cache.
+  ///
+  /// @param object_id The object ID of the object that was created.
+  /// @return Void.
+  void object_created(const ObjectID& object_id);
+
+  /// This method will be called when the Plasma store needs more space, perhaps
+  /// to create a new object. If the required amount of space cannot be freed up,
+  /// then a fatal error will be thrown. When this method is called, the eviction
+  /// policy will assume that the objects chosen to be evicted will in fact be
+  /// evicted from the Plasma store by the caller.
+  ///
+  /// @param size The size in bytes of the new object, including both data and
+  ///        metadata.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return True if enough space can be freed and false otherwise.
+  bool require_space(int64_t size, std::vector<ObjectID>* objects_to_evict);
+
+  /// This method will be called whenever an unused object in the Plasma store
+  /// starts to be used. When this method is called, the eviction policy will
+  /// assume that the objects chosen to be evicted will in fact be evicted from
+  /// the Plasma store by the caller.
+  ///
+  /// @param object_id The ID of the object that is now being used.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return Void.
+  void begin_object_access(
+      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+  /// This method will be called whenever an object in the Plasma store that was
+  /// being used is no longer being used. When this method is called, the
+  /// eviction policy will assume that the objects chosen to be evicted will in
+  /// fact be evicted from the Plasma store by the caller.
+  ///
+  /// @param object_id The ID of the object that is no longer being used.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return Void.
+  void end_object_access(
+      const ObjectID& object_id, std::vector<ObjectID>* objects_to_evict);
+
+  /// Choose some objects to evict from the Plasma store. When this method is
+  /// called, the eviction policy will assume that the objects chosen to be
+  /// evicted will in fact be evicted from the Plasma store by the caller.
+  ///
+  /// @note This method is not part of the API. It is exposed in the header file
+  /// only for testing.
+  ///
+  /// @param num_bytes_required The number of bytes of space to try to free up.
+  /// @param objects_to_evict The object IDs that were chosen for eviction will
+  ///        be stored into this vector.
+  /// @return The total number of bytes of space chosen to be evicted.
+  int64_t choose_objects_to_evict(
+      int64_t num_bytes_required, std::vector<ObjectID>* objects_to_evict);
+
+ private:
+  /// The amount of memory (in bytes) currently being used.
+  int64_t memory_used_;
+  /// Pointer to the plasma store info.
+  PlasmaStoreInfo* store_info_;
+  /// Datastructure for the LRU cache.
+  LRUCache cache_;
+};
+
+#endif  // PLASMA_EVICTION_POLICY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/extension.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc
new file mode 100644
index 0000000..5d61e33
--- /dev/null
+++ b/cpp/src/plasma/extension.cc
@@ -0,0 +1,456 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "plasma/extension.h"
+
+#include <algorithm>
+#include <vector>
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/protocol.h"
+
+PyObject* PlasmaOutOfMemoryError;
+PyObject* PlasmaObjectExistsError;
+
+PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
+  const char* store_socket_name;
+  const char* manager_socket_name;
+  int release_delay;
+  if (!PyArg_ParseTuple(
+          args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) {
+    return NULL;
+  }
+  PlasmaClient* client = new PlasmaClient();
+  ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay));
+
+  return PyCapsule_New(client, "plasma", NULL);
+}
+
+PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
+  PyObject* client_capsule;
+  if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
+  PlasmaClient* client;
+  ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
+  ARROW_CHECK_OK(client->Disconnect());
+  /* We use the context of the connection capsule to indicate if the connection
+   * is still active (if the context is NULL) or if it is closed (if the context
+   * is (void*) 0x1). This is neccessary because the primary pointer of the
+   * capsule cannot be NULL. */
+  PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  Py_ssize_t size;
+  PyObject* metadata;
+  if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
+          PyStringToUniqueID, &object_id, &size, &metadata)) {
+    return NULL;
+  }
+  if (!PyByteArray_Check(metadata)) {
+    PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
+    return NULL;
+  }
+  uint8_t* data;
+  Status s = client->Create(object_id, size,
+      reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
+      PyByteArray_Size(metadata), &data);
+  if (s.IsPlasmaObjectExists()) {
+    PyErr_SetString(PlasmaObjectExistsError,
+        "An object with this ID already exists in the plasma "
+        "store.");
+    return NULL;
+  }
+  if (s.IsPlasmaStoreFull()) {
+    PyErr_SetString(PlasmaOutOfMemoryError,
+        "The plasma store ran out of memory and could not create "
+        "this object.");
+    return NULL;
+  }
+  ARROW_CHECK(s.ok());
+
+#if PY_MAJOR_VERSION >= 3
+  return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE);
+#else
+  return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
+#endif
+}
+
+PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+          &object_id)) {
+    return NULL;
+  }
+  unsigned char digest[kDigestSize];
+  bool success = plasma_compute_object_hash(client, object_id, digest);
+  if (success) {
+    PyObject* digest_string =
+        PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize);
+    return digest_string;
+  } else {
+    Py_RETURN_NONE;
+  }
+}
+
+PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Seal(object_id));
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Release(object_id));
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  Py_ssize_t timeout_ms;
+  if (!PyArg_ParseTuple(
+          args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) {
+    return NULL;
+  }
+
+  Py_ssize_t num_object_ids = PyList_Size(object_id_list);
+  std::vector<ObjectID> object_ids(num_object_ids);
+  std::vector<ObjectBuffer> object_buffers(num_object_ids);
+
+  for (int i = 0; i < num_object_ids; ++i) {
+    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+  }
+
+  Py_BEGIN_ALLOW_THREADS;
+  ARROW_CHECK_OK(
+      client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data()));
+  Py_END_ALLOW_THREADS;
+
+  PyObject* returns = PyList_New(num_object_ids);
+  for (int i = 0; i < num_object_ids; ++i) {
+    if (object_buffers[i].data_size != -1) {
+      /* The object was retrieved, so return the object. */
+      PyObject* t = PyTuple_New(2);
+      Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size);
+      Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size);
+#if PY_MAJOR_VERSION >= 3
+      char* data = reinterpret_cast<char*>(object_buffers[i].data);
+      char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
+      PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ));
+      PyTuple_SET_ITEM(
+          t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
+#else
+      void* data = reinterpret_cast<void*>(object_buffers[i].data);
+      void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
+      PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
+      PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
+#endif
+      ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
+    } else {
+      /* The object was not retrieved, so just add None to the list of return
+       * values. */
+      Py_INCREF(Py_None);
+      ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
+    }
+  }
+  return returns;
+}
+
+PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+          &object_id)) {
+    return NULL;
+  }
+  bool has_object;
+  ARROW_CHECK_OK(client->Contains(object_id, &has_object));
+
+  if (has_object) {
+    Py_RETURN_TRUE;
+  } else {
+    Py_RETURN_FALSE;
+  }
+}
+
+PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) {
+    return NULL;
+  }
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+  Py_ssize_t n = PyList_Size(object_id_list);
+  ObjectID* object_ids = new ObjectID[n];
+  for (int i = 0; i < n; ++i) {
+    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
+  }
+  ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
+  delete[] object_ids;
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  PyObject* object_id_list;
+  Py_ssize_t timeout;
+  int num_returns;
+  if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list,
+          &timeout, &num_returns)) {
+    return NULL;
+  }
+  Py_ssize_t n = PyList_Size(object_id_list);
+
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+  if (num_returns < 0) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "The argument num_returns cannot be less than zero.");
+    return NULL;
+  }
+  if (num_returns > n) {
+    PyErr_SetString(PyExc_RuntimeError,
+        "The argument num_returns cannot be greater than len(object_ids)");
+    return NULL;
+  }
+  int64_t threshold = 1 << 30;
+  if (timeout > threshold) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30.");
+    return NULL;
+  }
+
+  std::vector<ObjectRequest> object_requests(n);
+  for (int i = 0; i < n; ++i) {
+    ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
+                    &object_requests[i].object_id) == 1);
+    object_requests[i].type = PLASMA_QUERY_ANYWHERE;
+  }
+  /* Drop the global interpreter lock while we are waiting, so other threads can
+   * run. */
+  int num_return_objects;
+  Py_BEGIN_ALLOW_THREADS;
+  ARROW_CHECK_OK(
+      client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects));
+  Py_END_ALLOW_THREADS;
+
+  int num_to_return = std::min(num_return_objects, num_returns);
+  PyObject* ready_ids = PyList_New(num_to_return);
+  PyObject* waiting_ids = PySet_New(object_id_list);
+  int num_returned = 0;
+  for (int i = 0; i < n; ++i) {
+    if (num_returned == num_to_return) { break; }
+    if (object_requests[i].status == ObjectStatus_Local ||
+        object_requests[i].status == ObjectStatus_Remote) {
+      PyObject* ready = PyBytes_FromStringAndSize(
+          reinterpret_cast<char*>(&object_requests[i].object_id),
+          sizeof(object_requests[i].object_id));
+      PyList_SetItem(ready_ids, num_returned, ready);
+      PySet_Discard(waiting_ids, ready);
+      num_returned += 1;
+    } else {
+      ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
+    }
+  }
+  ARROW_CHECK(num_returned == num_to_return);
+  /* Return both the ready IDs and the remaining IDs. */
+  PyObject* t = PyTuple_New(2);
+  PyTuple_SetItem(t, 0, ready_ids);
+  PyTuple_SetItem(t, 1, waiting_ids);
+  return t;
+}
+
+PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  Py_ssize_t num_bytes;
+  if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) {
+    return NULL;
+  }
+  int64_t evicted_bytes;
+  ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
+  return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
+}
+
+PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
+          &object_id)) {
+    return NULL;
+  }
+  ARROW_CHECK_OK(client->Delete(object_id));
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  ObjectID object_id;
+  const char* addr;
+  int port;
+  if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
+          PyStringToUniqueID, &object_id, &addr, &port)) {
+    return NULL;
+  }
+
+  if (client->get_manager_fd() == -1) {
+    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
+    return NULL;
+  }
+
+  ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
+  Py_RETURN_NONE;
+}
+
+PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
+  PlasmaClient* client;
+  if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; }
+
+  int sock;
+  ARROW_CHECK_OK(client->Subscribe(&sock));
+  return PyLong_FromLong(sock);
+}
+
+PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
+  int plasma_sock;
+
+  if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
+  /* Receive object notification from the plasma connection socket. If the
+   * object was added, return a tuple of its fields: ObjectID, data_size,
+   * metadata_size. If the object was deleted, data_size and metadata_size will
+   * be set to -1. */
+  uint8_t* notification = read_message_async(plasma_sock);
+  if (notification == NULL) {
+    PyErr_SetString(
+        PyExc_RuntimeError, "Failed to read object notification from Plasma socket");
+    return NULL;
+  }
+  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+  /* Construct a tuple from object_info and return. */
+  PyObject* t = PyTuple_New(3);
+  PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(),
+                            object_info->object_id()->size()));
+  if (object_info->is_deletion()) {
+    PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
+    PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
+  } else {
+    PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
+    PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
+  }
+
+  delete[] notification;
+  return t;
+}
+
+static PyMethodDef plasma_methods[] = {
+    {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
+    {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."},
+    {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
+    {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."},
+    {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
+    {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
+    {"contains", PyPlasma_contains, METH_VARARGS,
+        "Does the plasma store contain this plasma object?"},
+    {"fetch", PyPlasma_fetch, METH_VARARGS,
+        "Fetch the object from another plasma manager instance."},
+    {"wait", PyPlasma_wait, METH_VARARGS,
+        "Wait until num_returns objects in object_ids are ready."},
+    {"evict", PyPlasma_evict, METH_VARARGS,
+        "Evict some objects until we recover some number of bytes."},
+    {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
+    {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
+    {"transfer", PyPlasma_transfer, METH_VARARGS,
+        "Transfer object to another plasma manager."},
+    {"subscribe", PyPlasma_subscribe, METH_VARARGS,
+        "Subscribe to the plasma notification socket."},
+    {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
+        "Receive next notification from plasma notification socket."},
+    {NULL} /* Sentinel */
+};
+
+#if PY_MAJOR_VERSION >= 3
+static struct PyModuleDef moduledef = {
+    PyModuleDef_HEAD_INIT, "libplasma",    /* m_name */
+    "A Python client library for plasma.", /* m_doc */
+    0,                                     /* m_size */
+    plasma_methods,                        /* m_methods */
+    NULL,                                  /* m_reload */
+    NULL,                                  /* m_traverse */
+    NULL,                                  /* m_clear */
+    NULL,                                  /* m_free */
+};
+#endif
+
+#if PY_MAJOR_VERSION >= 3
+#define INITERROR return NULL
+#else
+#define INITERROR return
+#endif
+
+#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
+#define PyMODINIT_FUNC void
+#endif
+
+#if PY_MAJOR_VERSION >= 3
+#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
+#else
+#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
+#endif
+
+MOD_INIT(libplasma) {
+#if PY_MAJOR_VERSION >= 3
+  PyObject* m = PyModule_Create(&moduledef);
+#else
+  PyObject* m =
+      Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma.");
+#endif
+
+  /* Create a custom exception for when an object ID is reused. */
+  char plasma_object_exists_error[] = "plasma_object_exists.error";
+  PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL);
+  Py_INCREF(PlasmaObjectExistsError);
+  PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
+  /* Create a custom exception for when the plasma store is out of memory. */
+  char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
+  PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
+  Py_INCREF(PlasmaOutOfMemoryError);
+  PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
+
+#if PY_MAJOR_VERSION >= 3
+  return m;
+#endif
+}

http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/extension.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
new file mode 100644
index 0000000..cee30ab
--- /dev/null
+++ b/cpp/src/plasma/extension.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_EXTENSION_H
+#define PLASMA_EXTENSION_H
+
+#undef _XOPEN_SOURCE
+#undef _POSIX_C_SOURCE
+#include <Python.h>
+
+#include "bytesobject.h"  // NOLINT
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+
+static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
+  if (PyCapsule_IsValid(object, "plasma")) {
+    *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma"));
+    return 1;
+  } else {
+    PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
+    return 0;
+  }
+}
+
+int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
+  if (PyBytes_Check(object)) {
+    memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
+    return 1;
+  } else {
+    PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
+    return 0;
+  }
+}
+
+#endif  // PLASMA_EXTENSION_H