You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/24 16:12:49 UTC

[2/2] arrow git commit: ARROW-1149: [Plasma] Create Cython client library for Plasma

ARROW-1149: [Plasma] Create Cython client library for Plasma

This PR introduces a Cython API to Plasma, a FindPlasma.cmake to make it easier to integrate Plasma with CMake projects and sets up packaging with pyarrow.

Author: Philipp Moritz <pc...@gmail.com>
Author: Robert Nishihara <ro...@gmail.com>

Closes #797 from pcmoritz/plasma-cython and squashes the following commits:

d8319fc [Philipp Moritz] get for of PlasmaClient.connect
d14ab87 [Philipp Moritz] get rid of MutableBuffer
08f24a5 [Philipp Moritz] fix typos and move FixedSizeBufferOutputStream
e33443d [Philipp Moritz] fix setup.py develop for plasma
5f7b779 [Philipp Moritz] changes needed to make Ray work with Plasma in Arrow
b9e2dee [Philipp Moritz] fix windows build
3e4a84d [Philipp Moritz] fix segfault
0bea267 [Philipp Moritz] debug
23fe5f5 [Philipp Moritz] make plasma store binary part of the pyarrow package for tests
b863d13 [Philipp Moritz] fix
997de1e [Philipp Moritz] fix
47dc739 [Philipp Moritz] fixes
47033e7 [Philipp Moritz] switch to pytest
ed84c53 [Philipp Moritz] partial fixes
9bc5c15 [Philipp Moritz] implement wait and fetch for the client
45f338f [Philipp Moritz] test plasma on macOS
8b53618 [Philipp Moritz] fix
54f595e [Philipp Moritz] try fixing python 2 tests
2c6d652 [Philipp Moritz] convert docs to numpy format
3270628 [Philipp Moritz] try to get documentation up
44d1a55 [Philipp Moritz] cleanups and release GIL
a9f6502 [Philipp Moritz] more fixes
1ff88e7 [Philipp Moritz] fix travix ci
348f9bf [Philipp Moritz] fixes
4ae1a27 [Philipp Moritz] fix
fd80203 [Philipp Moritz] Plasma Python extension packaging: It compiles!
3b69973 [Robert Nishihara] Fixed minor python linting.
c9f6bcf [Robert Nishihara] Fix indentation and line lengths in plasma.pyx.
67b0951 [Robert Nishihara] Fix long lines in plasma/test/test.py.
e26527c [Robert Nishihara] Convert plasma test.py from 2 space indentation to 4 space indentation.
acc71d2 [Philipp Moritz] add round trip test for dataframes
2b7f949 [Philipp Moritz] implement mutable arrow python buffers
c06f1b5 [Philipp Moritz] fix test
1d7928f [Philipp Moritz] add arrow roundtrip test
6371e2e [Philipp Moritz] fix tests
3021d59 [Philipp Moritz] make ObjectID pickleable
dd5a7d8 [Philipp Moritz] fix tests
777e9c7 [Philipp Moritz] introduce plasma namespace
a4a9628 [Philipp Moritz] fix c++ tests
924888b [Philipp Moritz] update
f970df3 [Philipp Moritz] reduce logging
2ff2480 [Philipp Moritz] workaround for python visibility
d4934a9 [Philipp Moritz] update
cba92c1 [Philipp Moritz] setup.py for plasma
066d0ea [Philipp Moritz] test
1aea320 [Philipp Moritz] run plasma tests
3c4de52 [Philipp Moritz] use cmake to build the cython extension
bf39297 [Philipp Moritz] build and install pyarrow for plasma tests
5bf722a [Philipp Moritz] fix plasma path
1c5434c [Philipp Moritz] fix formatting
187cc24 [Philipp Moritz] add travis tests
c3d462d [Philipp Moritz] remove Python C extension
d9261b4 [Philipp Moritz] add documentation and license
db2d09a [Philipp Moritz] get all python tests in place
78d08ac [Philipp Moritz] make eviction work in Cython
18e0ac4 [Philipp Moritz] get tests
bc681ca [Philipp Moritz] port some python tests
f8e05f2 [Philipp Moritz] implement plasma.get in the cython client
d590c8a [Philipp Moritz] update
5178ee7 [Philipp Moritz] update
9044a01 [Philipp Moritz] initial plasma cython client commit


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a94f4716
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a94f4716
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a94f4716

Branch: refs/heads/master
Commit: a94f4716be8c33e86222d5a0be5a4d2a9102b93d
Parents: 05f7058
Author: Philipp Moritz <pc...@gmail.com>
Authored: Mon Jul 24 12:12:42 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jul 24 12:12:42 2017 -0400

----------------------------------------------------------------------
 .travis.yml                                |  21 +
 ci/travis_script_manylinux.sh              |   2 +-
 ci/travis_script_plasma.sh                 |  97 ++++
 ci/travis_script_python.sh                 |   4 +-
 cpp/src/arrow/util/logging.h               |   6 +-
 cpp/src/plasma/CMakeLists.txt              |  51 +-
 cpp/src/plasma/client.cc                   |  86 ++-
 cpp/src/plasma/client.h                    |  62 ++-
 cpp/src/plasma/common.cc                   |   9 +-
 cpp/src/plasma/common.h                    |  39 +-
 cpp/src/plasma/events.cc                   |   4 +
 cpp/src/plasma/events.h                    |   4 +
 cpp/src/plasma/eviction_policy.cc          |   4 +
 cpp/src/plasma/eviction_policy.h           |   4 +
 cpp/src/plasma/extension.cc                | 456 ----------------
 cpp/src/plasma/extension.h                 |  50 --
 cpp/src/plasma/plasma.cc                   |   4 +
 cpp/src/plasma/plasma.h                    |  53 +-
 cpp/src/plasma/plasma.pc.in                |  30 ++
 cpp/src/plasma/protocol.cc                 |   6 +-
 cpp/src/plasma/protocol.h                  |   6 +-
 cpp/src/plasma/store.cc                    |  12 +-
 cpp/src/plasma/store.h                     |   4 +
 cpp/src/plasma/test/client_tests.cc        |  10 +-
 cpp/src/plasma/test/serialization_tests.cc |   4 +
 python/CMakeLists.txt                      |  18 +
 python/cmake_modules/FindPlasma.cmake      |  99 ++++
 python/doc/source/api.rst                  |  15 +
 python/manylinux1/build_arrow.sh           |   5 +-
 python/pyarrow/__init__.py                 |   2 +-
 python/pyarrow/error.pxi                   |  18 +
 python/pyarrow/includes/common.pxd         |   3 +
 python/pyarrow/includes/libarrow.pxd       |   9 +
 python/pyarrow/io.pxi                      |  23 +-
 python/pyarrow/plasma.pyx                  | 560 +++++++++++++++++++
 python/pyarrow/tests/conftest.py           |   8 +-
 python/pyarrow/tests/test_plasma.py        | 683 ++++++++++++++++++++++++
 python/setup.py                            |  15 +
 38 files changed, 1855 insertions(+), 631 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index cdf787c..9cc2b86 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -120,6 +120,27 @@ matrix:
     - $TRAVIS_BUILD_DIR/ci/travis_before_script_c_glib.sh
     script:
     - $TRAVIS_BUILD_DIR/ci/travis_script_c_glib.sh
+  - compiler: gcc
+    language: cpp
+    os: linux
+    group: deprecated
+    before_script:
+    - export CC="gcc-4.9"
+    - export CXX="g++-4.9"
+    - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+    script:
+    - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
+    - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh
+  - compiler: clang
+    osx_image: xcode6.4
+    os: osx
+    cache:
+    addons:
+    before_script:
+    - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+    script:
+    - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
+    - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh
 
 before_install:
 - ulimit -c unlimited -S

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_manylinux.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_manylinux.sh b/ci/travis_script_manylinux.sh
index 4e6be62..844d5f7 100755
--- a/ci/travis_script_manylinux.sh
+++ b/ci/travis_script_manylinux.sh
@@ -18,4 +18,4 @@ set -ex
 pushd python/manylinux1
 git clone ../../ arrow
 docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 .
-docker run --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
+docker run --shm-size=2g --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_plasma.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_plasma.sh b/ci/travis_script_plasma.sh
new file mode 100755
index 0000000..fa384ad
--- /dev/null
+++ b/ci/travis_script_plasma.sh
@@ -0,0 +1,97 @@
+#!/usr/bin/env bash
+
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License. See accompanying LICENSE file.
+
+set -e
+
+source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
+
+export ARROW_HOME=$ARROW_CPP_INSTALL
+export PYARROW_WITH_PLASMA=1
+
+pushd $ARROW_PYTHON_DIR
+
+function build_arrow_libraries() {
+  CPP_BUILD_DIR=$1
+  CPP_DIR=$TRAVIS_BUILD_DIR/cpp
+
+  mkdir $CPP_BUILD_DIR
+  pushd $CPP_BUILD_DIR
+
+  cmake -DARROW_BUILD_TESTS=off \
+        -DARROW_PYTHON=on \
+        -DARROW_PLASMA=on \
+        -DCMAKE_INSTALL_PREFIX=$2 \
+        $CPP_DIR
+
+  make -j4
+  make install
+
+  popd
+}
+
+python_version_tests() {
+  PYTHON_VERSION=$1
+  CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION
+
+  export ARROW_HOME=$TRAVIS_BUILD_DIR/arrow-install-$PYTHON_VERSION
+  export LD_LIBRARY_PATH=$ARROW_HOME/lib:$PARQUET_HOME/lib
+
+  conda create -y -q -p $CONDA_ENV_DIR python=$PYTHON_VERSION cmake curl
+  source activate $CONDA_ENV_DIR
+
+  python --version
+  which python
+
+  # faster builds, please
+  conda install -y -q nomkl
+
+  # Expensive dependencies install from Continuum package repo
+  conda install -y -q pip numpy pandas cython
+
+  # Build C++ libraries
+  build_arrow_libraries arrow-build-$PYTHON_VERSION $ARROW_HOME
+
+  # Other stuff pip install
+  pip install -r requirements.txt
+
+  python setup.py build_ext --inplace
+
+  python -m pytest -vv -r sxX pyarrow
+
+  # Build documentation once
+  if [[ "$PYTHON_VERSION" == "3.6" ]]
+  then
+      conda install -y -q --file=doc/requirements.txt
+      python setup.py build_sphinx -s doc/source
+  fi
+
+  # Build and install pyarrow
+  pushd $TRAVIS_BUILD_DIR/python
+    python setup.py install
+  popd
+
+  # Run Plasma tests
+  pushd $TRAVIS_BUILD_DIR/python
+    python -m pytest pyarrow/tests/test_plasma.py
+    if [ $TRAVIS_OS_NAME == "linux" ]; then
+      PLASMA_VALGRIND=1 python -m pytest pyarrow/tests/test_plasma.py
+    fi
+  popd
+}
+
+# run tests for python 2.7 and 3.6
+python_version_tests 2.7
+python_version_tests 3.6
+
+popd

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index ac64c54..fdb5ad6 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -17,6 +17,7 @@ set -e
 source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
 
 export ARROW_HOME=$ARROW_CPP_INSTALL
+export PYARROW_WITH_PLASMA=1
 
 pushd $ARROW_PYTHON_DIR
 export PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env
@@ -71,9 +72,8 @@ function build_arrow_libraries() {
   pushd $CPP_BUILD_DIR
 
   cmake -DARROW_BUILD_TESTS=off \
-        -DARROW_PYTHON=on \
-        -DPLASMA_PYTHON=on \
         -DARROW_PLASMA=on \
+        -DARROW_PYTHON=on \
         -DCMAKE_INSTALL_PREFIX=$2 \
         $CPP_DIR
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b618121..0edaa9d 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -113,8 +113,10 @@ class CerrLog {
 
   template <class T>
   CerrLog& operator<<(const T& t) {
-    has_logged_ = true;
-    std::cerr << t;
+    if (severity_ != ARROW_DEBUG) {
+      has_logged_ = true;
+      std::cerr << t;
+    }
     return *this;
   }
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index 4ff3beb..8bb7e71 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -19,16 +19,13 @@ cmake_minimum_required(VERSION 2.8)
 
 project(plasma)
 
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../python/cmake_modules")
+
 find_package(PythonLibsNew REQUIRED)
 find_package(Threads)
 
-option(PLASMA_PYTHON
-  "Build the Plasma Python extensions"
-  OFF)
-
-if(APPLE)
-  SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
-endif(APPLE)
+set(PLASMA_SO_VERSION "0")
+set(PLASMA_ABI_VERSION "${PLASMA_SO_VERSION}.0.0")
 
 include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
 include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
@@ -40,7 +37,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
 # Compile flatbuffers
 
 set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
-set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
+set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/)
 
 set(PLASMA_FBS_OUTPUT_FILES
   "${OUTPUT_DIR}/common_generated.h"
@@ -69,8 +66,6 @@ endif()
 
 set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
 
-set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)
-
 set(PLASMA_SRCS
   client.cc
   common.cc
@@ -97,17 +92,33 @@ set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
 add_executable(plasma_store store.cc)
 target_link_libraries(plasma_store plasma_static)
 
+# Headers: top level
+install(FILES
+  common.h
+  common_generated.h
+  client.h
+  events.h
+  plasma.h
+  plasma_generated.h
+  protocol.h
+  DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma")
+
+# Plasma store
+install(TARGETS plasma_store DESTINATION ${CMAKE_INSTALL_BINDIR})
+
+# pkg-config support
+configure_file(plasma.pc.in
+  "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
+  @ONLY)
+install(
+  FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
+  DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+
+#######################################
+# Unit tests
+#######################################
+
 ADD_ARROW_TEST(test/serialization_tests)
 ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
 ADD_ARROW_TEST(test/client_tests)
 ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
-
-if(PLASMA_PYTHON)
-  add_library(plasma_extension SHARED extension.cc)
-
-  if(APPLE)
-    target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
-  else(APPLE)
-    target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
-  endif(APPLE)
-endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index dcb78e7..62bfbec 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -51,11 +51,31 @@
 
 #define XXH64_DEFAULT_SEED 0
 
+namespace plasma {
+
 // Number of threads used for memcopy and hash computations.
 constexpr int64_t kThreadPoolSize = 8;
 constexpr int64_t kBytesInMB = 1 << 20;
 static std::vector<std::thread> threadpool_(kThreadPoolSize);
 
+struct ObjectInUseEntry {
+  /// A count of the number of times this client has called PlasmaClient::Create
+  /// or
+  /// PlasmaClient::Get on this object ID minus the number of calls to
+  /// PlasmaClient::Release.
+  /// When this count reaches zero, we remove the entry from the ObjectsInUse
+  /// and decrement a count in the relevant ClientMmapTableEntry.
+  int count;
+  /// Cached information to read the object.
+  PlasmaObject object;
+  /// A flag representing whether the object has been sealed.
+  bool is_sealed;
+};
+
+PlasmaClient::PlasmaClient() {}
+
+PlasmaClient::~PlasmaClient() {}
+
 // If the file descriptor fd has been mmapped in this client process before,
 // return the pointer that was returned by mmap, otherwise mmap it and store the
 // pointer in a hash table.
@@ -300,6 +320,10 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
 }
 
 Status PlasmaClient::Release(const ObjectID& object_id) {
+  // If the client is already disconnected, ignore release requests.
+  if (store_conn_ < 0) {
+    return Status::OK();
+  }
   // Add the new object to the release history.
   release_history_.push_front(object_id);
   // If there are too many bytes in use by the client or if there are too many
@@ -386,22 +410,6 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
   return XXH64_digest(&hash_state);
 }
 
-bool plasma_compute_object_hash(
-    PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
-  // Get the plasma object data. We pass in a timeout of 0 to indicate that
-  // the operation should timeout immediately.
-  ObjectBuffer object_buffer;
-  ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
-  // If the object was not retrieved, return false.
-  if (object_buffer.data_size == -1) { return false; }
-  // Compute the hash.
-  uint64_t hash = compute_object_hash(object_buffer);
-  memcpy(digest, &hash, sizeof(hash));
-  // Release the plasma object.
-  ARROW_CHECK_OK(conn->Release(object_id));
-  return true;
-}
-
 Status PlasmaClient::Seal(const ObjectID& object_id) {
   // Make sure this client has a reference to the object before sending the
   // request to Plasma.
@@ -413,7 +421,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
   object_entry->second->is_sealed = true;
   /// Send the seal request to Plasma.
   static unsigned char digest[kDigestSize];
-  ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
+  RETURN_NOT_OK(Hash(object_id, &digest[0]));
   RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
   // We call PlasmaClient::Release to decrement the number of instances of this
   // object
@@ -439,6 +447,22 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
   return ReadEvictReply(buffer.data(), num_bytes_evicted);
 }
 
+Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
+  // Get the plasma object data. We pass in a timeout of 0 to indicate that
+  // the operation should timeout immediately.
+  ObjectBuffer object_buffer;
+  RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
+  // If the object was not retrieved, return false.
+  if (object_buffer.data_size == -1) {
+    return Status::PlasmaObjectNonexistent("Object not found");
+  }
+  // Compute the hash.
+  uint64_t hash = compute_object_hash(object_buffer);
+  memcpy(digest, &hash, sizeof(hash));
+  // Release the plasma object.
+  return Release(object_id);
+}
+
 Status PlasmaClient::Subscribe(int* fd) {
   int sock[2];
   // Create a non-blocking socket pair. This will only be used to send
@@ -459,6 +483,26 @@ Status PlasmaClient::Subscribe(int* fd) {
   return Status::OK();
 }
 
+Status PlasmaClient::GetNotification(
+    int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
+  uint8_t* notification = read_message_async(fd);
+  if (notification == NULL) {
+    return Status::IOError("Failed to read object notification from Plasma socket");
+  }
+  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+  ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
+  memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
+  if (object_info->is_deletion()) {
+    *data_size = -1;
+    *metadata_size = -1;
+  } else {
+    *data_size = object_info->data_size();
+    *metadata_size = object_info->metadata_size();
+  }
+  delete[] notification;
+  return Status::OK();
+}
+
 Status PlasmaClient::Connect(const std::string& store_socket_name,
     const std::string& manager_socket_name, int release_delay) {
   store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
@@ -485,7 +529,11 @@ Status PlasmaClient::Disconnect() {
   // Close the connections to Plasma. The Plasma store will release the objects
   // that were in use by us when handling the SIGPIPE.
   close(store_conn_);
-  if (manager_conn_ >= 0) { close(manager_conn_); }
+  store_conn_ = -1;
+  if (manager_conn_ >= 0) {
+    close(manager_conn_);
+    manager_conn_ = -1;
+  }
   return Status::OK();
 }
 
@@ -555,3 +603,5 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
   }
   return Status::OK();
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index fb3a161..d9ed9f7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -22,12 +22,18 @@
 #include <time.h>
 
 #include <deque>
+#include <memory>
 #include <string>
+#include <unordered_map>
 
-#include "plasma/plasma.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+#include "plasma/common.h"
 
 using arrow::Status;
 
+namespace plasma {
+
 #define PLASMA_DEFAULT_RELEASE_DELAY 64
 
 // Use 100MB as an overestimate of the L3 cache size.
@@ -63,22 +69,16 @@ struct ClientMmapTableEntry {
   int count;
 };
 
-struct ObjectInUseEntry {
-  /// A count of the number of times this client has called PlasmaClient::Create
-  /// or
-  /// PlasmaClient::Get on this object ID minus the number of calls to
-  /// PlasmaClient::Release.
-  /// When this count reaches zero, we remove the entry from the ObjectsInUse
-  /// and decrement a count in the relevant ClientMmapTableEntry.
-  int count;
-  /// Cached information to read the object.
-  PlasmaObject object;
-  /// A flag representing whether the object has been sealed.
-  bool is_sealed;
-};
+struct ObjectInUseEntry;
+struct ObjectRequest;
+struct PlasmaObject;
 
-class PlasmaClient {
+class ARROW_EXPORT PlasmaClient {
  public:
+  PlasmaClient();
+
+  ~PlasmaClient();
+
   /// Connect to the local plasma store and plasma manager. Return
   /// the resulting connection.
   ///
@@ -177,10 +177,18 @@ class PlasmaClient {
   /// @return The return status.
   Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
 
+  /// Compute the hash of an object in the object store.
+  ///
+  /// @param conn The object containing the connection state.
+  /// @param object_id The ID of the object we want to hash.
+  /// @param digest A pointer at which to return the hash digest of the object.
+  ///        The pointer must have at least kDigestSize bytes allocated.
+  /// @return The return status.
+  Status Hash(const ObjectID& object_id, uint8_t* digest);
+
   /// Subscribe to notifications when objects are sealed in the object store.
   /// Whenever an object is sealed, a message will be written to the client
-  /// socket
-  /// that is returned by this method.
+  /// socket that is returned by this method.
   ///
   /// @param fd Out parameter for the file descriptor the client should use to
   /// read notifications
@@ -188,6 +196,16 @@ class PlasmaClient {
   /// @return The return status.
   Status Subscribe(int* fd);
 
+  /// Receive next object notification for this client if Subscribe has been called.
+  ///
+  /// @param fd The file descriptor we are reading the notification from.
+  /// @param object_id Out parameter, the object_id of the object that was sealed.
+  /// @param data_size Out parameter, the data size of the object that was sealed.
+  /// @param metadata_size Out parameter, the metadata size of the object that was sealed.
+  /// @return The return status.
+  Status GetNotification(
+      int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
+
   /// Disconnect from the local plasma instance, including the local store and
   /// manager.
   ///
@@ -330,14 +348,6 @@ class PlasmaClient {
   int64_t store_capacity_;
 };
 
-/// Compute the hash of an object in the object store.
-///
-/// @param conn The object containing the connection state.
-/// @param object_id The ID of the object we want to hash.
-/// @param digest A pointer at which to return the hash digest of the object.
-///        The pointer must have at least DIGEST_SIZE bytes allocated.
-/// @return A boolean representing whether the hash operation succeeded.
-bool plasma_compute_object_hash(
-    PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
+} // namespace plasma
 
 #endif  // PLASMA_CLIENT_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index a09a963..a5f530e 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -19,7 +19,9 @@
 
 #include <random>
 
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
+
+namespace plasma {
 
 using arrow::Status;
 
@@ -81,3 +83,8 @@ Status plasma_error_status(int plasma_error) {
   }
   return Status::OK();
 }
+
+ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local;
+ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote;
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 85dc74b..6f2d4dd 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -29,9 +29,11 @@
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
 
+namespace plasma {
+
 constexpr int64_t kUniqueIDSize = 20;
 
-class UniqueID {
+class ARROW_EXPORT UniqueID {
  public:
   static UniqueID from_random();
   static UniqueID from_binary(const std::string& binary);
@@ -60,4 +62,39 @@ typedef UniqueID ObjectID;
 
 arrow::Status plasma_error_status(int plasma_error);
 
+/// Size of object hash digests.
+constexpr int64_t kDigestSize = sizeof(uint64_t);
+
+/// Object request data structure. Used for Wait.
+struct ObjectRequest {
+  /// The ID of the requested object. If ID_NIL request any object.
+  ObjectID object_id;
+  /// Request associated to the object. It can take one of the following values:
+  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
+  ///    local Plasma Store.
+  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
+  ///    the system (i.e., either in the local or a remote Plasma Store).
+  int type;
+  /// Object status. Same as the status returned by plasma_status() function
+  /// call. This is filled in by plasma_wait_for_objects1():
+  ///  - ObjectStatus_Local: object is ready at the local Plasma Store.
+  ///  - ObjectStatus_Remote: object is ready at a remote Plasma Store.
+  ///  - ObjectStatus_Nonexistent: object does not exist in the system.
+  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
+  ///    for being transferred or it is transferring.
+  int status;
+};
+
+enum ObjectRequestType {
+  /// Query for object in the local plasma store.
+  PLASMA_QUERY_LOCAL = 1,
+  /// Query for object in the local plasma store or in a remote plasma store.
+  PLASMA_QUERY_ANYWHERE
+};
+
+extern int ObjectStatusLocal;
+extern int ObjectStatusRemote;
+
+} // namespace plasma
+
 #endif  // PLASMA_COMMON_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
index a9f7356..675424d 100644
--- a/cpp/src/plasma/events.cc
+++ b/cpp/src/plasma/events.cc
@@ -19,6 +19,8 @@
 
 #include <errno.h>
 
+namespace plasma {
+
 void EventLoop::file_event_callback(
     aeEventLoop* loop, int fd, void* context, int events) {
   FileCallback* callback = reinterpret_cast<FileCallback*>(context);
@@ -79,3 +81,5 @@ int EventLoop::remove_timer(int64_t timer_id) {
   timer_callbacks_.erase(timer_id);
   return err;
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
index bd93d6b..b989b7f 100644
--- a/cpp/src/plasma/events.h
+++ b/cpp/src/plasma/events.h
@@ -26,6 +26,8 @@ extern "C" {
 #include "ae/ae.h"
 }
 
+namespace plasma {
+
 /// Constant specifying that the timer is done and it will be removed.
 constexpr int kEventLoopTimerDone = AE_NOMORE;
 
@@ -96,4 +98,6 @@ class EventLoop {
   std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
 };
 
+} // namespace plasma
+
 #endif  // PLASMA_EVENTS

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index 4ae6384..ef18e33 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -19,6 +19,8 @@
 
 #include <algorithm>
 
+namespace plasma {
+
 void LRUCache::add(const ObjectID& key, int64_t size) {
   auto it = item_map_.find(key);
   ARROW_CHECK(it == item_map_.end());
@@ -105,3 +107,5 @@ void EvictionPolicy::end_object_access(
   /* Add the object to the LRU cache.*/
   cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index 3815fc6..c4f2183 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -26,6 +26,8 @@
 #include "plasma/common.h"
 #include "plasma/plasma.h"
 
+namespace plasma {
+
 // ==== The eviction policy ====
 //
 // This file contains declaration for all functions and data structures that
@@ -131,4 +133,6 @@ class EvictionPolicy {
   LRUCache cache_;
 };
 
+} // namespace plasma
+
 #endif  // PLASMA_EVICTION_POLICY_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/extension.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc
deleted file mode 100644
index 5d61e33..0000000
--- a/cpp/src/plasma/extension.cc
+++ /dev/null
@@ -1,456 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/extension.h"
-
-#include <algorithm>
-#include <vector>
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-#include "plasma/io.h"
-#include "plasma/protocol.h"
-
-PyObject* PlasmaOutOfMemoryError;
-PyObject* PlasmaObjectExistsError;
-
-PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
-  const char* store_socket_name;
-  const char* manager_socket_name;
-  int release_delay;
-  if (!PyArg_ParseTuple(
-          args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) {
-    return NULL;
-  }
-  PlasmaClient* client = new PlasmaClient();
-  ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay));
-
-  return PyCapsule_New(client, "plasma", NULL);
-}
-
-PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
-  PyObject* client_capsule;
-  if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
-  PlasmaClient* client;
-  ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
-  ARROW_CHECK_OK(client->Disconnect());
-  /* We use the context of the connection capsule to indicate if the connection
-   * is still active (if the context is NULL) or if it is closed (if the context
-   * is (void*) 0x1). This is neccessary because the primary pointer of the
-   * capsule cannot be NULL. */
-  PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  Py_ssize_t size;
-  PyObject* metadata;
-  if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
-          PyStringToUniqueID, &object_id, &size, &metadata)) {
-    return NULL;
-  }
-  if (!PyByteArray_Check(metadata)) {
-    PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
-    return NULL;
-  }
-  uint8_t* data;
-  Status s = client->Create(object_id, size,
-      reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
-      PyByteArray_Size(metadata), &data);
-  if (s.IsPlasmaObjectExists()) {
-    PyErr_SetString(PlasmaObjectExistsError,
-        "An object with this ID already exists in the plasma "
-        "store.");
-    return NULL;
-  }
-  if (s.IsPlasmaStoreFull()) {
-    PyErr_SetString(PlasmaOutOfMemoryError,
-        "The plasma store ran out of memory and could not create "
-        "this object.");
-    return NULL;
-  }
-  ARROW_CHECK(s.ok());
-
-#if PY_MAJOR_VERSION >= 3
-  return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE);
-#else
-  return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
-#endif
-}
-
-PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  unsigned char digest[kDigestSize];
-  bool success = plasma_compute_object_hash(client, object_id, digest);
-  if (success) {
-    PyObject* digest_string =
-        PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize);
-    return digest_string;
-  } else {
-    Py_RETURN_NONE;
-  }
-}
-
-PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Seal(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Release(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  Py_ssize_t timeout_ms;
-  if (!PyArg_ParseTuple(
-          args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) {
-    return NULL;
-  }
-
-  Py_ssize_t num_object_ids = PyList_Size(object_id_list);
-  std::vector<ObjectID> object_ids(num_object_ids);
-  std::vector<ObjectBuffer> object_buffers(num_object_ids);
-
-  for (int i = 0; i < num_object_ids; ++i) {
-    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
-  }
-
-  Py_BEGIN_ALLOW_THREADS;
-  ARROW_CHECK_OK(
-      client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data()));
-  Py_END_ALLOW_THREADS;
-
-  PyObject* returns = PyList_New(num_object_ids);
-  for (int i = 0; i < num_object_ids; ++i) {
-    if (object_buffers[i].data_size != -1) {
-      /* The object was retrieved, so return the object. */
-      PyObject* t = PyTuple_New(2);
-      Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size);
-      Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size);
-#if PY_MAJOR_VERSION >= 3
-      char* data = reinterpret_cast<char*>(object_buffers[i].data);
-      char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
-      PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ));
-      PyTuple_SET_ITEM(
-          t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
-#else
-      void* data = reinterpret_cast<void*>(object_buffers[i].data);
-      void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
-      PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
-      PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
-#endif
-      ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
-    } else {
-      /* The object was not retrieved, so just add None to the list of return
-       * values. */
-      Py_INCREF(Py_None);
-      ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
-    }
-  }
-  return returns;
-}
-
-PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  bool has_object;
-  ARROW_CHECK_OK(client->Contains(object_id, &has_object));
-
-  if (has_object) {
-    Py_RETURN_TRUE;
-  } else {
-    Py_RETURN_FALSE;
-  }
-}
-
-PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) {
-    return NULL;
-  }
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-  Py_ssize_t n = PyList_Size(object_id_list);
-  ObjectID* object_ids = new ObjectID[n];
-  for (int i = 0; i < n; ++i) {
-    PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
-  }
-  ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
-  delete[] object_ids;
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  PyObject* object_id_list;
-  Py_ssize_t timeout;
-  int num_returns;
-  if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list,
-          &timeout, &num_returns)) {
-    return NULL;
-  }
-  Py_ssize_t n = PyList_Size(object_id_list);
-
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-  if (num_returns < 0) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "The argument num_returns cannot be less than zero.");
-    return NULL;
-  }
-  if (num_returns > n) {
-    PyErr_SetString(PyExc_RuntimeError,
-        "The argument num_returns cannot be greater than len(object_ids)");
-    return NULL;
-  }
-  int64_t threshold = 1 << 30;
-  if (timeout > threshold) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30.");
-    return NULL;
-  }
-
-  std::vector<ObjectRequest> object_requests(n);
-  for (int i = 0; i < n; ++i) {
-    ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
-                    &object_requests[i].object_id) == 1);
-    object_requests[i].type = PLASMA_QUERY_ANYWHERE;
-  }
-  /* Drop the global interpreter lock while we are waiting, so other threads can
-   * run. */
-  int num_return_objects;
-  Py_BEGIN_ALLOW_THREADS;
-  ARROW_CHECK_OK(
-      client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects));
-  Py_END_ALLOW_THREADS;
-
-  int num_to_return = std::min(num_return_objects, num_returns);
-  PyObject* ready_ids = PyList_New(num_to_return);
-  PyObject* waiting_ids = PySet_New(object_id_list);
-  int num_returned = 0;
-  for (int i = 0; i < n; ++i) {
-    if (num_returned == num_to_return) { break; }
-    if (object_requests[i].status == ObjectStatus_Local ||
-        object_requests[i].status == ObjectStatus_Remote) {
-      PyObject* ready = PyBytes_FromStringAndSize(
-          reinterpret_cast<char*>(&object_requests[i].object_id),
-          sizeof(object_requests[i].object_id));
-      PyList_SetItem(ready_ids, num_returned, ready);
-      PySet_Discard(waiting_ids, ready);
-      num_returned += 1;
-    } else {
-      ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
-    }
-  }
-  ARROW_CHECK(num_returned == num_to_return);
-  /* Return both the ready IDs and the remaining IDs. */
-  PyObject* t = PyTuple_New(2);
-  PyTuple_SetItem(t, 0, ready_ids);
-  PyTuple_SetItem(t, 1, waiting_ids);
-  return t;
-}
-
-PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  Py_ssize_t num_bytes;
-  if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) {
-    return NULL;
-  }
-  int64_t evicted_bytes;
-  ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
-  return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
-}
-
-PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
-          &object_id)) {
-    return NULL;
-  }
-  ARROW_CHECK_OK(client->Delete(object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  ObjectID object_id;
-  const char* addr;
-  int port;
-  if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
-          PyStringToUniqueID, &object_id, &addr, &port)) {
-    return NULL;
-  }
-
-  if (client->get_manager_fd() == -1) {
-    PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
-    return NULL;
-  }
-
-  ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
-  Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
-  PlasmaClient* client;
-  if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; }
-
-  int sock;
-  ARROW_CHECK_OK(client->Subscribe(&sock));
-  return PyLong_FromLong(sock);
-}
-
-PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
-  int plasma_sock;
-
-  if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
-  /* Receive object notification from the plasma connection socket. If the
-   * object was added, return a tuple of its fields: ObjectID, data_size,
-   * metadata_size. If the object was deleted, data_size and metadata_size will
-   * be set to -1. */
-  uint8_t* notification = read_message_async(plasma_sock);
-  if (notification == NULL) {
-    PyErr_SetString(
-        PyExc_RuntimeError, "Failed to read object notification from Plasma socket");
-    return NULL;
-  }
-  auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
-  /* Construct a tuple from object_info and return. */
-  PyObject* t = PyTuple_New(3);
-  PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(),
-                            object_info->object_id()->size()));
-  if (object_info->is_deletion()) {
-    PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
-    PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
-  } else {
-    PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
-    PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
-  }
-
-  delete[] notification;
-  return t;
-}
-
-static PyMethodDef plasma_methods[] = {
-    {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
-    {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."},
-    {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
-    {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."},
-    {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
-    {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
-    {"contains", PyPlasma_contains, METH_VARARGS,
-        "Does the plasma store contain this plasma object?"},
-    {"fetch", PyPlasma_fetch, METH_VARARGS,
-        "Fetch the object from another plasma manager instance."},
-    {"wait", PyPlasma_wait, METH_VARARGS,
-        "Wait until num_returns objects in object_ids are ready."},
-    {"evict", PyPlasma_evict, METH_VARARGS,
-        "Evict some objects until we recover some number of bytes."},
-    {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
-    {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
-    {"transfer", PyPlasma_transfer, METH_VARARGS,
-        "Transfer object to another plasma manager."},
-    {"subscribe", PyPlasma_subscribe, METH_VARARGS,
-        "Subscribe to the plasma notification socket."},
-    {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
-        "Receive next notification from plasma notification socket."},
-    {NULL} /* Sentinel */
-};
-
-#if PY_MAJOR_VERSION >= 3
-static struct PyModuleDef moduledef = {
-    PyModuleDef_HEAD_INIT, "libplasma",    /* m_name */
-    "A Python client library for plasma.", /* m_doc */
-    0,                                     /* m_size */
-    plasma_methods,                        /* m_methods */
-    NULL,                                  /* m_reload */
-    NULL,                                  /* m_traverse */
-    NULL,                                  /* m_clear */
-    NULL,                                  /* m_free */
-};
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define INITERROR return NULL
-#else
-#define INITERROR return
-#endif
-
-#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
-#define PyMODINIT_FUNC void
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
-#else
-#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
-#endif
-
-MOD_INIT(libplasma) {
-#if PY_MAJOR_VERSION >= 3
-  PyObject* m = PyModule_Create(&moduledef);
-#else
-  PyObject* m =
-      Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma.");
-#endif
-
-  /* Create a custom exception for when an object ID is reused. */
-  char plasma_object_exists_error[] = "plasma_object_exists.error";
-  PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL);
-  Py_INCREF(PlasmaObjectExistsError);
-  PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
-  /* Create a custom exception for when the plasma store is out of memory. */
-  char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
-  PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
-  Py_INCREF(PlasmaOutOfMemoryError);
-  PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
-
-#if PY_MAJOR_VERSION >= 3
-  return m;
-#endif
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/extension.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
deleted file mode 100644
index cee30ab..0000000
--- a/cpp/src/plasma/extension.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_EXTENSION_H
-#define PLASMA_EXTENSION_H
-
-#undef _XOPEN_SOURCE
-#undef _POSIX_C_SOURCE
-#include <Python.h>
-
-#include "bytesobject.h"  // NOLINT
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-
-static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
-  if (PyCapsule_IsValid(object, "plasma")) {
-    *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma"));
-    return 1;
-  } else {
-    PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
-    return 0;
-  }
-}
-
-int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
-  if (PyBytes_Check(object)) {
-    memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
-    return 1;
-  } else {
-    PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
-    return 0;
-  }
-}
-
-#endif  // PLASMA_EXTENSION_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 559d8e7..bfed500 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -24,6 +24,8 @@
 #include "plasma/common.h"
 #include "plasma/protocol.h"
 
+namespace plasma {
+
 int warn_if_sigpipe(int status, int client_sock) {
   if (status >= 0) { return 0; }
   if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
@@ -62,3 +64,5 @@ ObjectTableEntry* get_object_table_entry(
   if (it == store_info->objects.end()) { return NULL; }
   return it->second.get();
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 275d0c7..db8669f 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -32,8 +32,10 @@
 
 #include "arrow/status.h"
 #include "arrow/util/logging.h"
-#include "format/common_generated.h"
 #include "plasma/common.h"
+#include "plasma/common_generated.h"
+
+namespace plasma {
 
 #define HANDLE_SIGPIPE(s, fd_)                                              \
   do {                                                                      \
@@ -54,47 +56,23 @@
 /// Allocation granularity used in plasma for object allocation.
 #define BLOCK_SIZE 64
 
-/// Size of object hash digests.
-constexpr int64_t kDigestSize = sizeof(uint64_t);
-
 struct Client;
 
-/// Object request data structure. Used in the plasma_wait_for_objects()
-/// argument.
-typedef struct {
-  /// The ID of the requested object. If ID_NIL request any object.
-  ObjectID object_id;
-  /// Request associated to the object. It can take one of the following values:
-  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
-  ///    local Plasma Store.
-  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
-  ///    the system (i.e., either in the local or a remote Plasma Store).
-  int type;
-  /// Object status. Same as the status returned by plasma_status() function
-  /// call. This is filled in by plasma_wait_for_objects1():
-  ///  - ObjectStatus_Local: object is ready at the local Plasma Store.
-  ///  - ObjectStatus_Remote: object is ready at a remote Plasma Store.
-  ///  - ObjectStatus_Nonexistent: object does not exist in the system.
-  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
-  ///    for being transferred or it is transferring.
-  int status;
-} ObjectRequest;
-
 /// Mapping from object IDs to type and status of the request.
 typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
 
 /// Handle to access memory mapped file and map it into client address space.
-typedef struct {
+struct object_handle {
   /// The file descriptor of the memory mapped file in the store. It is used as
   /// a unique identifier of the file in the client to look up the corresponding
   /// file descriptor on the client's side.
   int store_fd;
   /// The size in bytes of the memory mapped file.
   int64_t mmap_size;
-} object_handle;
+};
 
 // TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-typedef struct {
+struct PlasmaObject {
   /// Handle for memory mapped file the object is stored in.
   object_handle handle;
   /// The offset in bytes in the memory mapped file of the data.
@@ -105,28 +83,21 @@ typedef struct {
   int64_t data_size;
   /// The size in bytes of the metadata.
   int64_t metadata_size;
-} PlasmaObject;
+};
 
-typedef enum {
+enum object_state {
   /// Object was created but not sealed in the local Plasma Store.
   PLASMA_CREATED = 1,
   /// Object is sealed and stored in the local Plasma Store.
   PLASMA_SEALED
-} object_state;
+};
 
-typedef enum {
+enum object_status {
   /// The object was not found.
   OBJECT_NOT_FOUND = 0,
   /// The object was found.
   OBJECT_FOUND = 1
-} object_status;
-
-typedef enum {
-  /// Query for object in the local plasma store.
-  PLASMA_QUERY_LOCAL = 1,
-  /// Query for object in the local plasma store or in a remote plasma store.
-  PLASMA_QUERY_ANYWHERE
-} object_request_type;
+};
 
 /// This type is used by the Plasma store. It is here because it is exposed to
 /// the eviction policy.
@@ -188,4 +159,6 @@ int warn_if_sigpipe(int status, int client_sock);
 
 uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
 
+} // namespace plasma
+
 #endif  // PLASMA_PLASMA_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.pc.in
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.pc.in b/cpp/src/plasma/plasma.pc.in
new file mode 100644
index 0000000..d868689
--- /dev/null
+++ b/cpp/src/plasma/plasma.pc.in
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+prefix=@CMAKE_INSTALL_PREFIX@
+libdir=${prefix}/@CMAKE_INSTALL_LIBDIR@
+includedir=${prefix}/include
+
+so_version=@PLASMA_SO_VERSION@
+abi_version=@PLASMA_ABI_VERSION@
+executable=${prefix}/@CMAKE_INSTALL_BINDIR@/plasma_store
+
+Name: Plasma
+Description: Plasma is an in-memory object store and cache for big data.
+Version: @PLASMA_VERSION@
+Libs: -L${libdir} -lplasma
+Cflags: -I${includedir}

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 246aa29..2998c68 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -18,11 +18,13 @@
 #include "plasma/protocol.h"
 
 #include "flatbuffers/flatbuffers.h"
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
 
 #include "plasma/common.h"
 #include "plasma/io.h"
 
+namespace plasma {
+
 using flatbuffers::uoffset_t;
 
 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
@@ -500,3 +502,5 @@ Status ReadDataReply(
   *metadata_size = (int64_t)message->metadata_size();
   return Status::OK();
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 5d9d136..835c5a0 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -21,9 +21,11 @@
 #include <vector>
 
 #include "arrow/status.h"
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
 #include "plasma/plasma.h"
 
+namespace plasma {
+
 using arrow::Status;
 
 /* Plasma receive message. */
@@ -167,4 +169,6 @@ Status SendDataReply(
 Status ReadDataReply(
     uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size);
 
+} // namespace plasma
+
 #endif /* PLASMA_PROTOCOL */

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 9394e3d..8d4fb10 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -49,12 +49,14 @@
 #include <unordered_set>
 #include <vector>
 
-#include "format/common_generated.h"
+#include "plasma/common_generated.h"
 #include "plasma/common.h"
 #include "plasma/fling.h"
 #include "plasma/io.h"
 #include "plasma/malloc.h"
 
+namespace plasma {
+
 extern "C" {
 void* dlmalloc(size_t bytes);
 void* dlmemalign(size_t alignment, size_t bytes);
@@ -625,8 +627,10 @@ void start_server(char* socket_name, int64_t system_memory) {
   loop.run();
 }
 
+} // namespace plasma
+
 int main(int argc, char* argv[]) {
-  signal(SIGTERM, signal_handler);
+  signal(SIGTERM, plasma::signal_handler);
   char* socket_name = NULL;
   int64_t system_memory = -1;
   int c;
@@ -677,7 +681,7 @@ int main(int argc, char* argv[]) {
 #endif
   // Make it so dlmalloc fails if we try to request more memory than is
   // available.
-  dlmalloc_set_footprint_limit((size_t)system_memory);
+  plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
   ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
-  start_server(socket_name, system_memory);
+  plasma::start_server(socket_name, system_memory);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 8bd9426..27c3813 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -27,6 +27,8 @@
 #include "plasma/plasma.h"
 #include "plasma/protocol.h"
 
+namespace plasma {
+
 struct GetRequest;
 
 struct NotificationQueue {
@@ -166,4 +168,6 @@ class PlasmaStore {
   std::unordered_map<int, NotificationQueue> pending_notifications_;
 };
 
+} // namespace plasma
+
 #endif  // PLASMA_STORE_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/test/client_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 29b5b13..6dc558e 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -29,7 +29,9 @@
 #include "plasma/plasma.h"
 #include "plasma/protocol.h"
 
-std::string g_test_executable;  // NOLINT
+namespace plasma {
+
+std::string test_executable;  // NOLINT
 
 class TestPlasmaStore : public ::testing::Test {
  public:
@@ -37,7 +39,7 @@ class TestPlasmaStore : public ::testing::Test {
   // stdout of the object store. Consider changing that.
   void SetUp() {
     std::string plasma_directory =
-        g_test_executable.substr(0, g_test_executable.find_last_of("/"));
+        test_executable.substr(0, test_executable.find_last_of("/"));
     std::string plasma_command =
         plasma_directory +
         "/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
@@ -125,8 +127,10 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   ASSERT_EQ(object_buffer[1].data[0], 2);
 }
 
+} // namespace plasma
+
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
-  g_test_executable = std::string(argv[0]);
+  plasma::test_executable = std::string(argv[0]);
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/test/serialization_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 325cead..13938cd 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -25,6 +25,8 @@
 #include "plasma/plasma.h"
 #include "plasma/protocol.h"
 
+namespace plasma {
+
 /**
  * Create a temporary file. Needs to be closed by the caller.
  *
@@ -386,3 +388,5 @@ TEST(PlasmaSerialization, DataReply) {
   ASSERT_EQ(object_size1, object_size2);
   ASSERT_EQ(metadata_size1, metadata_size2);
 }
+
+} // namespace plasma

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 224147d..6ff6646 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -220,6 +220,12 @@ include_directories(SYSTEM
 find_package(Arrow REQUIRED)
 include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
 
+## Plasma
+find_package(Plasma)
+if (PLASMA_FOUND)
+  include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
+endif()
+
 function(bundle_arrow_lib library_path)
   get_filename_component(LIBRARY_DIR ${${library_path}} DIRECTORY)
   get_filename_component(LIBRARY_NAME ${${library_path}} NAME_WE)
@@ -252,6 +258,9 @@ if (PYARROW_BUNDLE_ARROW_CPP)
   file(COPY ${ARROW_INCLUDE_DIR}/arrow DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY}/include)
   bundle_arrow_lib(ARROW_SHARED_LIB)
   bundle_arrow_lib(ARROW_PYTHON_SHARED_LIB)
+  if (PLASMA_FOUND)
+    bundle_arrow_lib(PLASMA_SHARED_LIB)
+  endif()
 endif()
 
 if (MSVC)
@@ -278,9 +287,14 @@ set(CYTHON_EXTENSIONS
   lib
 )
 
+if (PLASMA_FOUND)
+  set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma)
+endif()
+
 set(LINK_LIBS
   arrow_shared
   arrow_python_shared
+  ${PLASMA_SHARED_LIB}
 )
 
 if (PYARROW_BUILD_PARQUET)
@@ -379,3 +393,7 @@ foreach(module ${CYTHON_EXTENSIONS})
 
     target_link_libraries(${module_name} ${LINK_LIBS})
 endforeach(module)
+
+if (PLASMA_FOUND)
+  file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY})
+endif()

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/cmake_modules/FindPlasma.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindPlasma.cmake b/python/cmake_modules/FindPlasma.cmake
new file mode 100644
index 0000000..3acaa34
--- /dev/null
+++ b/python/cmake_modules/FindPlasma.cmake
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find PLASMA (plasma/client.h, libplasma.a, libplasma.so)
+# This module defines
+#  PLASMA_INCLUDE_DIR, directory containing headers
+#  PLASMA_LIBS, directory containing plasma libraries
+#  PLASMA_STATIC_LIB, path to libplasma.a
+#  PLASMA_SHARED_LIB, path to libplasma's shared library
+#  PLASMA_SHARED_IMP_LIB, path to libplasma's import library (MSVC only)
+#  PLASMA_FOUND, whether plasma has been found
+
+include(FindPkgConfig)
+
+if ("$ENV{ARROW_HOME}" STREQUAL "")
+  pkg_check_modules(PLASMA plasma)
+  if (PLASMA_FOUND)
+    pkg_get_variable(PLASMA_EXECUTABLE plasma executable)
+    pkg_get_variable(PLASMA_ABI_VERSION plasma abi_version)
+    message(STATUS "Plasma ABI version: ${PLASMA_ABI_VERSION}")
+    pkg_get_variable(PLASMA_SO_VERSION plasma so_version)
+    message(STATUS "Plasma SO version: ${PLASMA_SO_VERSION}")
+    set(PLASMA_INCLUDE_DIR ${PLASMA_INCLUDE_DIRS})
+    set(PLASMA_LIBS ${PLASMA_LIBRARY_DIRS})
+    set(PLASMA_SEARCH_LIB_PATH ${PLASMA_LIBRARY_DIRS})
+  endif()
+else()
+  set(PLASMA_HOME "$ENV{ARROW_HOME}")
+
+  set(PLASMA_EXECUTABLE ${PLASMA_HOME}/bin/plasma_store)
+
+  set(PLASMA_SEARCH_HEADER_PATHS
+    ${PLASMA_HOME}/include
+    )
+
+  set(PLASMA_SEARCH_LIB_PATH
+    ${PLASMA_HOME}/lib
+    )
+
+  find_path(PLASMA_INCLUDE_DIR plasma/client.h PATHS
+    ${PLASMA_SEARCH_HEADER_PATHS}
+    # make sure we don't accidentally pick up a different version
+    NO_DEFAULT_PATH
+    )
+endif()
+
+find_library(PLASMA_LIB_PATH NAMES plasma
+  PATHS
+  ${PLASMA_SEARCH_LIB_PATH}
+  NO_DEFAULT_PATH)
+get_filename_component(PLASMA_LIBS ${PLASMA_LIB_PATH} DIRECTORY)
+
+if (PLASMA_INCLUDE_DIR AND PLASMA_LIBS)
+  set(PLASMA_FOUND TRUE)
+  set(PLASMA_LIB_NAME plasma)
+
+  set(PLASMA_STATIC_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}.a)
+
+  set(PLASMA_SHARED_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+endif()
+
+if (PLASMA_FOUND)
+  if (NOT Plasma_FIND_QUIETLY)
+    message(STATUS "Found the Plasma core library: ${PLASMA_LIB_PATH}")
+    message(STATUS "Found Plasma executable: ${PLASMA_EXECUTABLE}")
+  endif ()
+else ()
+  if (NOT Plasma_FIND_QUIETLY)
+    set(PLASMA_ERR_MSG "Could not find the Plasma library. Looked for headers")
+    set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_HEADER_PATHS}, and for libs")
+    set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_LIB_PATH}")
+    if (Plasma_FIND_REQUIRED)
+      message(FATAL_ERROR "${PLASMA_ERR_MSG}")
+    else (Plasma_FIND_REQUIRED)
+      message(STATUS "${PLASMA_ERR_MSG}")
+    endif (Plasma_FIND_REQUIRED)
+  endif ()
+  set(PLASMA_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+  PLASMA_INCLUDE_DIR
+  PLASMA_STATIC_LIB
+  PLASMA_SHARED_LIB
+)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index c52d400..780aa48 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -212,6 +212,21 @@ Type Classes
    Field
    Schema
 
+.. currentmodule:: pyarrow.plasma
+
+.. _api.plasma:
+
+In-Memory Object Store
+----------------------
+
+.. autosummary::
+   :toctree: generated/
+
+   ObjectID
+   PlasmaClient
+   PlasmaBuffer
+   MutablePlasmaBuffer
+
 .. currentmodule:: pyarrow.parquet
 
 .. _api.parquet:

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/manylinux1/build_arrow.sh
----------------------------------------------------------------------
diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh
index 8c6bda9..85c096a 100755
--- a/python/manylinux1/build_arrow.sh
+++ b/python/manylinux1/build_arrow.sh
@@ -35,6 +35,7 @@ cd /arrow/python
 # PyArrow build configuration
 export PYARROW_BUILD_TYPE='release'
 export PYARROW_WITH_PARQUET=1
+export PYARROW_WITH_PLASMA=1
 export PYARROW_BUNDLE_ARROW_CPP=1
 # Need as otherwise arrow_io is sometimes not linked
 export LDFLAGS="-Wl,--no-as-needed"
@@ -52,7 +53,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do
     ARROW_BUILD_DIR=/arrow/cpp/build-PY${PYTHON}
     mkdir -p "${ARROW_BUILD_DIR}"
     pushd "${ARROW_BUILD_DIR}"
-    PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} ..
+    PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} -DARROW_PLASMA=ON ..
     make -j5 install
     popd
 
@@ -65,6 +66,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do
     echo "=== (${PYTHON}) Test the existence of optional modules ==="
     $PIPI_IO -r requirements.txt
     PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.parquet"
+    PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.plasma"
 
     echo "=== (${PYTHON}) Tag the wheel with manylinux1 ==="
     mkdir -p repaired_wheels/
@@ -78,4 +80,3 @@ for PYTHON in ${PYTHON_VERSIONS}; do
 
     mv repaired_wheels/*.whl /io/dist
 done
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index e3d783a..6d0ce20 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -68,6 +68,7 @@ from pyarrow.lib import (null, bool_,
                          Date32Value, Date64Value, TimestampValue)
 
 from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
+                         FixedSizeBufferOutputStream,
                          Buffer, BufferReader, BufferOutputStream,
                          OSFile, MemoryMappedFile, memory_map,
                          frombuffer,
@@ -99,7 +100,6 @@ from pyarrow.ipc import (Message, MessageReader,
                          open_file,
                          serialize_pandas, deserialize_pandas)
 
-
 localfs = LocalFilesystem.get_instance()
 
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/error.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 259aeb0..8a3f57d 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -48,6 +48,18 @@ class ArrowNotImplementedError(NotImplementedError, ArrowException):
     pass
 
 
+class PlasmaObjectExists(ArrowException):
+    pass
+
+
+class PlasmaObjectNonexistent(ArrowException):
+    pass
+
+
+class PlasmaStoreFull(ArrowException):
+    pass
+
+
 cdef int check_status(const CStatus& status) nogil except -1:
     if status.ok():
         return 0
@@ -66,5 +78,11 @@ cdef int check_status(const CStatus& status) nogil except -1:
             raise ArrowNotImplementedError(message)
         elif status.IsTypeError():
             raise ArrowTypeError(message)
+        elif status.IsPlasmaObjectExists():
+            raise PlasmaObjectExists(message)
+        elif status.IsPlasmaObjectNonexistent():
+            raise PlasmaObjectNonexistent(message)
+        elif status.IsPlasmaStoreFull():
+            raise PlasmaStoreFull(message)
         else:
             raise ArrowException(message)

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 3487d48..637a133 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -50,6 +50,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         c_bool IsKeyError()
         c_bool IsNotImplemented()
         c_bool IsTypeError()
+        c_bool IsPlasmaObjectExists()
+        c_bool IsPlasmaObjectNonexistent()
+        c_bool IsPlasmaStoreFull()
 
 
 cdef inline object PyObject_to_object(PyObject* o):

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index edf50ad..ffe867b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -148,9 +148,15 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
         CLoggingMemoryPool(CMemoryPool*)
 
     cdef cppclass CBuffer" arrow::Buffer":
+        CBuffer(const uint8_t* data, int64_t size)
         uint8_t* data()
         int64_t size()
         shared_ptr[CBuffer] parent()
+        c_bool is_mutable() const
+
+    cdef cppclass CMutableBuffer" arrow::MutableBuffer"(CBuffer):
+        CMutableBuffer(const uint8_t* data, int64_t size)
+        uint8_t* mutable_data()
 
     cdef cppclass ResizableBuffer(CBuffer):
         CStatus Resize(int64_t nbytes)
@@ -558,6 +564,9 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
         CMockOutputStream()
         int64_t GetExtentBytesWritten()
 
+    cdef cppclass CFixedSizeBufferWriter" arrow::io::FixedSizeBufferWriter"(WriteableFile):
+        CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer)
+
 
 cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
     enum MessageType" arrow::ipc::Message::Type":

http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 8b213a3..181b0b1 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -473,6 +473,15 @@ cdef class OSFile(NativeFile):
         self.wr_file = <shared_ptr[OutputStream]> handle
 
 
+cdef class FixedSizeBufferOutputStream(NativeFile):
+
+    def __cinit__(self, Buffer buffer):
+        self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
+        self.is_readable = 0
+        self.is_writeable = 1
+        self.is_open = True
+
+
 # ----------------------------------------------------------------------
 # Arrow buffers
 
@@ -523,7 +532,10 @@ cdef class Buffer:
         buffer.len = self.size
         buffer.ndim = 1
         buffer.obj = self
-        buffer.readonly = 1
+        if self.buffer.get().is_mutable():
+            buffer.readonly = 0
+        else:
+            buffer.readonly = 1
         buffer.shape = self.shape
         buffer.strides = self.strides
         buffer.suboffsets = NULL
@@ -540,6 +552,15 @@ cdef class Buffer:
             p[0] = <void*> self.buffer.get().data()
         return self.size
 
+    def __getwritebuffer__(self, Py_ssize_t idx, void **p):
+        if not self.buffer.get().is_mutable():
+            raise SystemError("trying to write an immutable buffer")
+        if idx != 0:
+            raise SystemError("accessing non-existent buffer segment")
+        if p != NULL:
+            p[0] = <void*> self.buffer.get().data()
+        return self.size
+
 
 cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
     cdef shared_ptr[PoolBuffer] result