You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/24 16:12:49 UTC
[2/2] arrow git commit: ARROW-1149: [Plasma] Create Cython client
library for Plasma
ARROW-1149: [Plasma] Create Cython client library for Plasma
This PR introduces a Cython API to Plasma, a FindPlasma.cmake to make it easier to integrate Plasma with CMake projects and sets up packaging with pyarrow.
Author: Philipp Moritz <pc...@gmail.com>
Author: Robert Nishihara <ro...@gmail.com>
Closes #797 from pcmoritz/plasma-cython and squashes the following commits:
d8319fc [Philipp Moritz] get for of PlasmaClient.connect
d14ab87 [Philipp Moritz] get rid of MutableBuffer
08f24a5 [Philipp Moritz] fix typos and move FixedSizeBufferOutputStream
e33443d [Philipp Moritz] fix setup.py develop for plasma
5f7b779 [Philipp Moritz] changes needed to make Ray work with Plasma in Arrow
b9e2dee [Philipp Moritz] fix windows build
3e4a84d [Philipp Moritz] fix segfault
0bea267 [Philipp Moritz] debug
23fe5f5 [Philipp Moritz] make plasma store binary part of the pyarrow package for tests
b863d13 [Philipp Moritz] fix
997de1e [Philipp Moritz] fix
47dc739 [Philipp Moritz] fixes
47033e7 [Philipp Moritz] switch to pytest
ed84c53 [Philipp Moritz] partial fixes
9bc5c15 [Philipp Moritz] implement wait and fetch for the client
45f338f [Philipp Moritz] test plasma on macOS
8b53618 [Philipp Moritz] fix
54f595e [Philipp Moritz] try fixing python 2 tests
2c6d652 [Philipp Moritz] convert docs to numpy format
3270628 [Philipp Moritz] try to get documentation up
44d1a55 [Philipp Moritz] cleanups and release GIL
a9f6502 [Philipp Moritz] more fixes
1ff88e7 [Philipp Moritz] fix travix ci
348f9bf [Philipp Moritz] fixes
4ae1a27 [Philipp Moritz] fix
fd80203 [Philipp Moritz] Plasma Python extension packaging: It compiles!
3b69973 [Robert Nishihara] Fixed minor python linting.
c9f6bcf [Robert Nishihara] Fix indentation and line lengths in plasma.pyx.
67b0951 [Robert Nishihara] Fix long lines in plasma/test/test.py.
e26527c [Robert Nishihara] Convert plasma test.py from 2 space indentation to 4 space indentation.
acc71d2 [Philipp Moritz] add round trip test for dataframes
2b7f949 [Philipp Moritz] implement mutable arrow python buffers
c06f1b5 [Philipp Moritz] fix test
1d7928f [Philipp Moritz] add arrow roundtrip test
6371e2e [Philipp Moritz] fix tests
3021d59 [Philipp Moritz] make ObjectID pickleable
dd5a7d8 [Philipp Moritz] fix tests
777e9c7 [Philipp Moritz] introduce plasma namespace
a4a9628 [Philipp Moritz] fix c++ tests
924888b [Philipp Moritz] update
f970df3 [Philipp Moritz] reduce logging
2ff2480 [Philipp Moritz] workaround for python visibility
d4934a9 [Philipp Moritz] update
cba92c1 [Philipp Moritz] setup.py for plasma
066d0ea [Philipp Moritz] test
1aea320 [Philipp Moritz] run plasma tests
3c4de52 [Philipp Moritz] use cmake to build the cython extension
bf39297 [Philipp Moritz] build and install pyarrow for plasma tests
5bf722a [Philipp Moritz] fix plasma path
1c5434c [Philipp Moritz] fix formatting
187cc24 [Philipp Moritz] add travis tests
c3d462d [Philipp Moritz] remove Python C extension
d9261b4 [Philipp Moritz] add documentation and license
db2d09a [Philipp Moritz] get all python tests in place
78d08ac [Philipp Moritz] make eviction work in Cython
18e0ac4 [Philipp Moritz] get tests
bc681ca [Philipp Moritz] port some python tests
f8e05f2 [Philipp Moritz] implement plasma.get in the cython client
d590c8a [Philipp Moritz] update
5178ee7 [Philipp Moritz] update
9044a01 [Philipp Moritz] initial plasma cython client commit
Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/a94f4716
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/a94f4716
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/a94f4716
Branch: refs/heads/master
Commit: a94f4716be8c33e86222d5a0be5a4d2a9102b93d
Parents: 05f7058
Author: Philipp Moritz <pc...@gmail.com>
Authored: Mon Jul 24 12:12:42 2017 -0400
Committer: Wes McKinney <we...@twosigma.com>
Committed: Mon Jul 24 12:12:42 2017 -0400
----------------------------------------------------------------------
.travis.yml | 21 +
ci/travis_script_manylinux.sh | 2 +-
ci/travis_script_plasma.sh | 97 ++++
ci/travis_script_python.sh | 4 +-
cpp/src/arrow/util/logging.h | 6 +-
cpp/src/plasma/CMakeLists.txt | 51 +-
cpp/src/plasma/client.cc | 86 ++-
cpp/src/plasma/client.h | 62 ++-
cpp/src/plasma/common.cc | 9 +-
cpp/src/plasma/common.h | 39 +-
cpp/src/plasma/events.cc | 4 +
cpp/src/plasma/events.h | 4 +
cpp/src/plasma/eviction_policy.cc | 4 +
cpp/src/plasma/eviction_policy.h | 4 +
cpp/src/plasma/extension.cc | 456 ----------------
cpp/src/plasma/extension.h | 50 --
cpp/src/plasma/plasma.cc | 4 +
cpp/src/plasma/plasma.h | 53 +-
cpp/src/plasma/plasma.pc.in | 30 ++
cpp/src/plasma/protocol.cc | 6 +-
cpp/src/plasma/protocol.h | 6 +-
cpp/src/plasma/store.cc | 12 +-
cpp/src/plasma/store.h | 4 +
cpp/src/plasma/test/client_tests.cc | 10 +-
cpp/src/plasma/test/serialization_tests.cc | 4 +
python/CMakeLists.txt | 18 +
python/cmake_modules/FindPlasma.cmake | 99 ++++
python/doc/source/api.rst | 15 +
python/manylinux1/build_arrow.sh | 5 +-
python/pyarrow/__init__.py | 2 +-
python/pyarrow/error.pxi | 18 +
python/pyarrow/includes/common.pxd | 3 +
python/pyarrow/includes/libarrow.pxd | 9 +
python/pyarrow/io.pxi | 23 +-
python/pyarrow/plasma.pyx | 560 +++++++++++++++++++
python/pyarrow/tests/conftest.py | 8 +-
python/pyarrow/tests/test_plasma.py | 683 ++++++++++++++++++++++++
python/setup.py | 15 +
38 files changed, 1855 insertions(+), 631 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index cdf787c..9cc2b86 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -120,6 +120,27 @@ matrix:
- $TRAVIS_BUILD_DIR/ci/travis_before_script_c_glib.sh
script:
- $TRAVIS_BUILD_DIR/ci/travis_script_c_glib.sh
+ - compiler: gcc
+ language: cpp
+ os: linux
+ group: deprecated
+ before_script:
+ - export CC="gcc-4.9"
+ - export CXX="g++-4.9"
+ - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+ script:
+ - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
+ - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh
+ - compiler: clang
+ osx_image: xcode6.4
+ os: osx
+ cache:
+ addons:
+ before_script:
+ - $TRAVIS_BUILD_DIR/ci/travis_before_script_cpp.sh
+ script:
+ - $TRAVIS_BUILD_DIR/ci/travis_script_cpp.sh
+ - $TRAVIS_BUILD_DIR/ci/travis_script_plasma.sh
before_install:
- ulimit -c unlimited -S
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_manylinux.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_manylinux.sh b/ci/travis_script_manylinux.sh
index 4e6be62..844d5f7 100755
--- a/ci/travis_script_manylinux.sh
+++ b/ci/travis_script_manylinux.sh
@@ -18,4 +18,4 @@ set -ex
pushd python/manylinux1
git clone ../../ arrow
docker build -t arrow-base-x86_64 -f Dockerfile-x86_64 .
-docker run --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
+docker run --shm-size=2g --rm -e PYARROW_PARALLEL=3 -v $PWD:/io arrow-base-x86_64 /io/build_arrow.sh
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_plasma.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_plasma.sh b/ci/travis_script_plasma.sh
new file mode 100755
index 0000000..fa384ad
--- /dev/null
+++ b/ci/travis_script_plasma.sh
@@ -0,0 +1,97 @@
+#!/usr/bin/env bash
+
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License. See accompanying LICENSE file.
+
+set -e
+
+source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
+
+export ARROW_HOME=$ARROW_CPP_INSTALL
+export PYARROW_WITH_PLASMA=1
+
+pushd $ARROW_PYTHON_DIR
+
+function build_arrow_libraries() {
+ CPP_BUILD_DIR=$1
+ CPP_DIR=$TRAVIS_BUILD_DIR/cpp
+
+ mkdir $CPP_BUILD_DIR
+ pushd $CPP_BUILD_DIR
+
+ cmake -DARROW_BUILD_TESTS=off \
+ -DARROW_PYTHON=on \
+ -DARROW_PLASMA=on \
+ -DCMAKE_INSTALL_PREFIX=$2 \
+ $CPP_DIR
+
+ make -j4
+ make install
+
+ popd
+}
+
+python_version_tests() {
+ PYTHON_VERSION=$1
+ CONDA_ENV_DIR=$TRAVIS_BUILD_DIR/pyarrow-test-$PYTHON_VERSION
+
+ export ARROW_HOME=$TRAVIS_BUILD_DIR/arrow-install-$PYTHON_VERSION
+ export LD_LIBRARY_PATH=$ARROW_HOME/lib:$PARQUET_HOME/lib
+
+ conda create -y -q -p $CONDA_ENV_DIR python=$PYTHON_VERSION cmake curl
+ source activate $CONDA_ENV_DIR
+
+ python --version
+ which python
+
+ # faster builds, please
+ conda install -y -q nomkl
+
+ # Expensive dependencies install from Continuum package repo
+ conda install -y -q pip numpy pandas cython
+
+ # Build C++ libraries
+ build_arrow_libraries arrow-build-$PYTHON_VERSION $ARROW_HOME
+
+ # Other stuff pip install
+ pip install -r requirements.txt
+
+ python setup.py build_ext --inplace
+
+ python -m pytest -vv -r sxX pyarrow
+
+ # Build documentation once
+ if [[ "$PYTHON_VERSION" == "3.6" ]]
+ then
+ conda install -y -q --file=doc/requirements.txt
+ python setup.py build_sphinx -s doc/source
+ fi
+
+ # Build and install pyarrow
+ pushd $TRAVIS_BUILD_DIR/python
+ python setup.py install
+ popd
+
+ # Run Plasma tests
+ pushd $TRAVIS_BUILD_DIR/python
+ python -m pytest pyarrow/tests/test_plasma.py
+ if [ $TRAVIS_OS_NAME == "linux" ]; then
+ PLASMA_VALGRIND=1 python -m pytest pyarrow/tests/test_plasma.py
+ fi
+ popd
+}
+
+# run tests for python 2.7 and 3.6
+python_version_tests 2.7
+python_version_tests 3.6
+
+popd
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/ci/travis_script_python.sh
----------------------------------------------------------------------
diff --git a/ci/travis_script_python.sh b/ci/travis_script_python.sh
index ac64c54..fdb5ad6 100755
--- a/ci/travis_script_python.sh
+++ b/ci/travis_script_python.sh
@@ -17,6 +17,7 @@ set -e
source $TRAVIS_BUILD_DIR/ci/travis_env_common.sh
export ARROW_HOME=$ARROW_CPP_INSTALL
+export PYARROW_WITH_PLASMA=1
pushd $ARROW_PYTHON_DIR
export PARQUET_HOME=$TRAVIS_BUILD_DIR/parquet-env
@@ -71,9 +72,8 @@ function build_arrow_libraries() {
pushd $CPP_BUILD_DIR
cmake -DARROW_BUILD_TESTS=off \
- -DARROW_PYTHON=on \
- -DPLASMA_PYTHON=on \
-DARROW_PLASMA=on \
+ -DARROW_PYTHON=on \
-DCMAKE_INSTALL_PREFIX=$2 \
$CPP_DIR
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/arrow/util/logging.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/util/logging.h b/cpp/src/arrow/util/logging.h
index b618121..0edaa9d 100644
--- a/cpp/src/arrow/util/logging.h
+++ b/cpp/src/arrow/util/logging.h
@@ -113,8 +113,10 @@ class CerrLog {
template <class T>
CerrLog& operator<<(const T& t) {
- has_logged_ = true;
- std::cerr << t;
+ if (severity_ != ARROW_DEBUG) {
+ has_logged_ = true;
+ std::cerr << t;
+ }
return *this;
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index 4ff3beb..8bb7e71 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -19,16 +19,13 @@ cmake_minimum_required(VERSION 2.8)
project(plasma)
+set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${CMAKE_SOURCE_DIR}/../python/cmake_modules")
+
find_package(PythonLibsNew REQUIRED)
find_package(Threads)
-option(PLASMA_PYTHON
- "Build the Plasma Python extensions"
- OFF)
-
-if(APPLE)
- SET(CMAKE_SHARED_LIBRARY_SUFFIX ".so")
-endif(APPLE)
+set(PLASMA_SO_VERSION "0")
+set(PLASMA_ABI_VERSION "${PLASMA_SO_VERSION}.0.0")
include_directories(SYSTEM ${PYTHON_INCLUDE_DIRS})
include_directories("${FLATBUFFERS_INCLUDE_DIR}" "${CMAKE_CURRENT_LIST_DIR}/" "${CMAKE_CURRENT_LIST_DIR}/thirdparty/" "${CMAKE_CURRENT_LIST_DIR}/../")
@@ -40,7 +37,7 @@ set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-conversion")
# Compile flatbuffers
set(PLASMA_FBS_SRC "${CMAKE_CURRENT_LIST_DIR}/format/plasma.fbs" "${CMAKE_CURRENT_LIST_DIR}/format/common.fbs")
-set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/format/)
+set(OUTPUT_DIR ${CMAKE_CURRENT_LIST_DIR}/)
set(PLASMA_FBS_OUTPUT_FILES
"${OUTPUT_DIR}/common_generated.h"
@@ -69,8 +66,6 @@ endif()
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
-set_source_files_properties(extension.cc PROPERTIES COMPILE_FLAGS -Wno-strict-aliasing)
-
set(PLASMA_SRCS
client.cc
common.cc
@@ -97,17 +92,33 @@ set_source_files_properties(malloc.cc PROPERTIES COMPILE_FLAGS "-Wno-error -O3")
add_executable(plasma_store store.cc)
target_link_libraries(plasma_store plasma_static)
+# Headers: top level
+install(FILES
+ common.h
+ common_generated.h
+ client.h
+ events.h
+ plasma.h
+ plasma_generated.h
+ protocol.h
+ DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma")
+
+# Plasma store
+install(TARGETS plasma_store DESTINATION ${CMAKE_INSTALL_BINDIR})
+
+# pkg-config support
+configure_file(plasma.pc.in
+ "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
+ @ONLY)
+install(
+ FILES "${CMAKE_CURRENT_BINARY_DIR}/plasma.pc"
+ DESTINATION "${CMAKE_INSTALL_LIBDIR}/pkgconfig/")
+
+#######################################
+# Unit tests
+#######################################
+
ADD_ARROW_TEST(test/serialization_tests)
ARROW_TEST_LINK_LIBRARIES(test/serialization_tests plasma_static)
ADD_ARROW_TEST(test/client_tests)
ARROW_TEST_LINK_LIBRARIES(test/client_tests plasma_static)
-
-if(PLASMA_PYTHON)
- add_library(plasma_extension SHARED extension.cc)
-
- if(APPLE)
- target_link_libraries(plasma_extension plasma_static "-undefined dynamic_lookup")
- else(APPLE)
- target_link_libraries(plasma_extension plasma_static -Wl,--whole-archive ${FLATBUFFERS_STATIC_LIB} -Wl,--no-whole-archive)
- endif(APPLE)
-endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/client.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index dcb78e7..62bfbec 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -51,11 +51,31 @@
#define XXH64_DEFAULT_SEED 0
+namespace plasma {
+
// Number of threads used for memcopy and hash computations.
constexpr int64_t kThreadPoolSize = 8;
constexpr int64_t kBytesInMB = 1 << 20;
static std::vector<std::thread> threadpool_(kThreadPoolSize);
+struct ObjectInUseEntry {
+ /// A count of the number of times this client has called PlasmaClient::Create
+ /// or
+ /// PlasmaClient::Get on this object ID minus the number of calls to
+ /// PlasmaClient::Release.
+ /// When this count reaches zero, we remove the entry from the ObjectsInUse
+ /// and decrement a count in the relevant ClientMmapTableEntry.
+ int count;
+ /// Cached information to read the object.
+ PlasmaObject object;
+ /// A flag representing whether the object has been sealed.
+ bool is_sealed;
+};
+
+PlasmaClient::PlasmaClient() {}
+
+PlasmaClient::~PlasmaClient() {}
+
// If the file descriptor fd has been mmapped in this client process before,
// return the pointer that was returned by mmap, otherwise mmap it and store the
// pointer in a hash table.
@@ -300,6 +320,10 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
}
Status PlasmaClient::Release(const ObjectID& object_id) {
+ // If the client is already disconnected, ignore release requests.
+ if (store_conn_ < 0) {
+ return Status::OK();
+ }
// Add the new object to the release history.
release_history_.push_front(object_id);
// If there are too many bytes in use by the client or if there are too many
@@ -386,22 +410,6 @@ static uint64_t compute_object_hash(const ObjectBuffer& obj_buffer) {
return XXH64_digest(&hash_state);
}
-bool plasma_compute_object_hash(
- PlasmaClient* conn, ObjectID object_id, unsigned char* digest) {
- // Get the plasma object data. We pass in a timeout of 0 to indicate that
- // the operation should timeout immediately.
- ObjectBuffer object_buffer;
- ARROW_CHECK_OK(conn->Get(&object_id, 1, 0, &object_buffer));
- // If the object was not retrieved, return false.
- if (object_buffer.data_size == -1) { return false; }
- // Compute the hash.
- uint64_t hash = compute_object_hash(object_buffer);
- memcpy(digest, &hash, sizeof(hash));
- // Release the plasma object.
- ARROW_CHECK_OK(conn->Release(object_id));
- return true;
-}
-
Status PlasmaClient::Seal(const ObjectID& object_id) {
// Make sure this client has a reference to the object before sending the
// request to Plasma.
@@ -413,7 +421,7 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
object_entry->second->is_sealed = true;
/// Send the seal request to Plasma.
static unsigned char digest[kDigestSize];
- ARROW_CHECK(plasma_compute_object_hash(this, object_id, &digest[0]));
+ RETURN_NOT_OK(Hash(object_id, &digest[0]));
RETURN_NOT_OK(SendSealRequest(store_conn_, object_id, &digest[0]));
// We call PlasmaClient::Release to decrement the number of instances of this
// object
@@ -439,6 +447,22 @@ Status PlasmaClient::Evict(int64_t num_bytes, int64_t& num_bytes_evicted) {
return ReadEvictReply(buffer.data(), num_bytes_evicted);
}
+Status PlasmaClient::Hash(const ObjectID& object_id, uint8_t* digest) {
+ // Get the plasma object data. We pass in a timeout of 0 to indicate that
+ // the operation should timeout immediately.
+ ObjectBuffer object_buffer;
+ RETURN_NOT_OK(Get(&object_id, 1, 0, &object_buffer));
+ // If the object was not retrieved, return false.
+ if (object_buffer.data_size == -1) {
+ return Status::PlasmaObjectNonexistent("Object not found");
+ }
+ // Compute the hash.
+ uint64_t hash = compute_object_hash(object_buffer);
+ memcpy(digest, &hash, sizeof(hash));
+ // Release the plasma object.
+ return Release(object_id);
+}
+
Status PlasmaClient::Subscribe(int* fd) {
int sock[2];
// Create a non-blocking socket pair. This will only be used to send
@@ -459,6 +483,26 @@ Status PlasmaClient::Subscribe(int* fd) {
return Status::OK();
}
+Status PlasmaClient::GetNotification(
+ int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
+ uint8_t* notification = read_message_async(fd);
+ if (notification == NULL) {
+ return Status::IOError("Failed to read object notification from Plasma socket");
+ }
+ auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
+ ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
+ memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
+ if (object_info->is_deletion()) {
+ *data_size = -1;
+ *metadata_size = -1;
+ } else {
+ *data_size = object_info->data_size();
+ *metadata_size = object_info->metadata_size();
+ }
+ delete[] notification;
+ return Status::OK();
+}
+
Status PlasmaClient::Connect(const std::string& store_socket_name,
const std::string& manager_socket_name, int release_delay) {
store_conn_ = connect_ipc_sock_retry(store_socket_name, -1, -1);
@@ -485,7 +529,11 @@ Status PlasmaClient::Disconnect() {
// Close the connections to Plasma. The Plasma store will release the objects
// that were in use by us when handling the SIGPIPE.
close(store_conn_);
- if (manager_conn_ >= 0) { close(manager_conn_); }
+ store_conn_ = -1;
+ if (manager_conn_ >= 0) {
+ close(manager_conn_);
+ manager_conn_ = -1;
+ }
return Status::OK();
}
@@ -555,3 +603,5 @@ Status PlasmaClient::Wait(int64_t num_object_requests, ObjectRequest* object_req
}
return Status::OK();
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/client.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index fb3a161..d9ed9f7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -22,12 +22,18 @@
#include <time.h>
#include <deque>
+#include <memory>
#include <string>
+#include <unordered_map>
-#include "plasma/plasma.h"
+#include "arrow/status.h"
+#include "arrow/util/visibility.h"
+#include "plasma/common.h"
using arrow::Status;
+namespace plasma {
+
#define PLASMA_DEFAULT_RELEASE_DELAY 64
// Use 100MB as an overestimate of the L3 cache size.
@@ -63,22 +69,16 @@ struct ClientMmapTableEntry {
int count;
};
-struct ObjectInUseEntry {
- /// A count of the number of times this client has called PlasmaClient::Create
- /// or
- /// PlasmaClient::Get on this object ID minus the number of calls to
- /// PlasmaClient::Release.
- /// When this count reaches zero, we remove the entry from the ObjectsInUse
- /// and decrement a count in the relevant ClientMmapTableEntry.
- int count;
- /// Cached information to read the object.
- PlasmaObject object;
- /// A flag representing whether the object has been sealed.
- bool is_sealed;
-};
+struct ObjectInUseEntry;
+struct ObjectRequest;
+struct PlasmaObject;
-class PlasmaClient {
+class ARROW_EXPORT PlasmaClient {
public:
+ PlasmaClient();
+
+ ~PlasmaClient();
+
/// Connect to the local plasma store and plasma manager. Return
/// the resulting connection.
///
@@ -177,10 +177,18 @@ class PlasmaClient {
/// @return The return status.
Status Evict(int64_t num_bytes, int64_t& num_bytes_evicted);
+ /// Compute the hash of an object in the object store.
+ ///
+ /// @param conn The object containing the connection state.
+ /// @param object_id The ID of the object we want to hash.
+ /// @param digest A pointer at which to return the hash digest of the object.
+ /// The pointer must have at least kDigestSize bytes allocated.
+ /// @return The return status.
+ Status Hash(const ObjectID& object_id, uint8_t* digest);
+
/// Subscribe to notifications when objects are sealed in the object store.
/// Whenever an object is sealed, a message will be written to the client
- /// socket
- /// that is returned by this method.
+ /// socket that is returned by this method.
///
/// @param fd Out parameter for the file descriptor the client should use to
/// read notifications
@@ -188,6 +196,16 @@ class PlasmaClient {
/// @return The return status.
Status Subscribe(int* fd);
+ /// Receive next object notification for this client if Subscribe has been called.
+ ///
+ /// @param fd The file descriptor we are reading the notification from.
+ /// @param object_id Out parameter, the object_id of the object that was sealed.
+ /// @param data_size Out parameter, the data size of the object that was sealed.
+ /// @param metadata_size Out parameter, the metadata size of the object that was sealed.
+ /// @return The return status.
+ Status GetNotification(
+ int fd, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
+
/// Disconnect from the local plasma instance, including the local store and
/// manager.
///
@@ -330,14 +348,6 @@ class PlasmaClient {
int64_t store_capacity_;
};
-/// Compute the hash of an object in the object store.
-///
-/// @param conn The object containing the connection state.
-/// @param object_id The ID of the object we want to hash.
-/// @param digest A pointer at which to return the hash digest of the object.
-/// The pointer must have at least DIGEST_SIZE bytes allocated.
-/// @return A boolean representing whether the hash operation succeeded.
-bool plasma_compute_object_hash(
- PlasmaClient* conn, ObjectID object_id, unsigned char* digest);
+} // namespace plasma
#endif // PLASMA_CLIENT_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/common.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index a09a963..a5f530e 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -19,7 +19,9 @@
#include <random>
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
+
+namespace plasma {
using arrow::Status;
@@ -81,3 +83,8 @@ Status plasma_error_status(int plasma_error) {
}
return Status::OK();
}
+
+ARROW_EXPORT int ObjectStatusLocal = ObjectStatus_Local;
+ARROW_EXPORT int ObjectStatusRemote = ObjectStatus_Remote;
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/common.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 85dc74b..6f2d4dd 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -29,9 +29,11 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"
+namespace plasma {
+
constexpr int64_t kUniqueIDSize = 20;
-class UniqueID {
+class ARROW_EXPORT UniqueID {
public:
static UniqueID from_random();
static UniqueID from_binary(const std::string& binary);
@@ -60,4 +62,39 @@ typedef UniqueID ObjectID;
arrow::Status plasma_error_status(int plasma_error);
+/// Size of object hash digests.
+constexpr int64_t kDigestSize = sizeof(uint64_t);
+
+/// Object request data structure. Used for Wait.
+struct ObjectRequest {
+ /// The ID of the requested object. If ID_NIL request any object.
+ ObjectID object_id;
+ /// Request associated to the object. It can take one of the following values:
+ /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the
+ /// local Plasma Store.
+ /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
+ /// the system (i.e., either in the local or a remote Plasma Store).
+ int type;
+ /// Object status. Same as the status returned by plasma_status() function
+ /// call. This is filled in by plasma_wait_for_objects1():
+ /// - ObjectStatus_Local: object is ready at the local Plasma Store.
+ /// - ObjectStatus_Remote: object is ready at a remote Plasma Store.
+ /// - ObjectStatus_Nonexistent: object does not exist in the system.
+ /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
+ /// for being transferred or it is transferring.
+ int status;
+};
+
+enum ObjectRequestType {
+ /// Query for object in the local plasma store.
+ PLASMA_QUERY_LOCAL = 1,
+ /// Query for object in the local plasma store or in a remote plasma store.
+ PLASMA_QUERY_ANYWHERE
+};
+
+extern int ObjectStatusLocal;
+extern int ObjectStatusRemote;
+
+} // namespace plasma
+
#endif // PLASMA_COMMON_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/events.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.cc b/cpp/src/plasma/events.cc
index a9f7356..675424d 100644
--- a/cpp/src/plasma/events.cc
+++ b/cpp/src/plasma/events.cc
@@ -19,6 +19,8 @@
#include <errno.h>
+namespace plasma {
+
void EventLoop::file_event_callback(
aeEventLoop* loop, int fd, void* context, int events) {
FileCallback* callback = reinterpret_cast<FileCallback*>(context);
@@ -79,3 +81,5 @@ int EventLoop::remove_timer(int64_t timer_id) {
timer_callbacks_.erase(timer_id);
return err;
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/events.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/events.h b/cpp/src/plasma/events.h
index bd93d6b..b989b7f 100644
--- a/cpp/src/plasma/events.h
+++ b/cpp/src/plasma/events.h
@@ -26,6 +26,8 @@ extern "C" {
#include "ae/ae.h"
}
+namespace plasma {
+
/// Constant specifying that the timer is done and it will be removed.
constexpr int kEventLoopTimerDone = AE_NOMORE;
@@ -96,4 +98,6 @@ class EventLoop {
std::unordered_map<int64_t, std::unique_ptr<TimerCallback>> timer_callbacks_;
};
+} // namespace plasma
+
#endif // PLASMA_EVENTS
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/eviction_policy.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index 4ae6384..ef18e33 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -19,6 +19,8 @@
#include <algorithm>
+namespace plasma {
+
void LRUCache::add(const ObjectID& key, int64_t size) {
auto it = item_map_.find(key);
ARROW_CHECK(it == item_map_.end());
@@ -105,3 +107,5 @@ void EvictionPolicy::end_object_access(
/* Add the object to the LRU cache.*/
cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/eviction_policy.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index 3815fc6..c4f2183 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -26,6 +26,8 @@
#include "plasma/common.h"
#include "plasma/plasma.h"
+namespace plasma {
+
// ==== The eviction policy ====
//
// This file contains declaration for all functions and data structures that
@@ -131,4 +133,6 @@ class EvictionPolicy {
LRUCache cache_;
};
+} // namespace plasma
+
#endif // PLASMA_EVICTION_POLICY_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/extension.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.cc b/cpp/src/plasma/extension.cc
deleted file mode 100644
index 5d61e33..0000000
--- a/cpp/src/plasma/extension.cc
+++ /dev/null
@@ -1,456 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/extension.h"
-
-#include <algorithm>
-#include <vector>
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-#include "plasma/io.h"
-#include "plasma/protocol.h"
-
-PyObject* PlasmaOutOfMemoryError;
-PyObject* PlasmaObjectExistsError;
-
-PyObject* PyPlasma_connect(PyObject* self, PyObject* args) {
- const char* store_socket_name;
- const char* manager_socket_name;
- int release_delay;
- if (!PyArg_ParseTuple(
- args, "ssi", &store_socket_name, &manager_socket_name, &release_delay)) {
- return NULL;
- }
- PlasmaClient* client = new PlasmaClient();
- ARROW_CHECK_OK(client->Connect(store_socket_name, manager_socket_name, release_delay));
-
- return PyCapsule_New(client, "plasma", NULL);
-}
-
-PyObject* PyPlasma_disconnect(PyObject* self, PyObject* args) {
- PyObject* client_capsule;
- if (!PyArg_ParseTuple(args, "O", &client_capsule)) { return NULL; }
- PlasmaClient* client;
- ARROW_CHECK(PyObjectToPlasmaClient(client_capsule, &client));
- ARROW_CHECK_OK(client->Disconnect());
- /* We use the context of the connection capsule to indicate if the connection
- * is still active (if the context is NULL) or if it is closed (if the context
- * is (void*) 0x1). This is neccessary because the primary pointer of the
- * capsule cannot be NULL. */
- PyCapsule_SetContext(client_capsule, reinterpret_cast<void*>(0x1));
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_create(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- Py_ssize_t size;
- PyObject* metadata;
- if (!PyArg_ParseTuple(args, "O&O&nO", PyObjectToPlasmaClient, &client,
- PyStringToUniqueID, &object_id, &size, &metadata)) {
- return NULL;
- }
- if (!PyByteArray_Check(metadata)) {
- PyErr_SetString(PyExc_TypeError, "metadata must be a bytearray");
- return NULL;
- }
- uint8_t* data;
- Status s = client->Create(object_id, size,
- reinterpret_cast<uint8_t*>(PyByteArray_AsString(metadata)),
- PyByteArray_Size(metadata), &data);
- if (s.IsPlasmaObjectExists()) {
- PyErr_SetString(PlasmaObjectExistsError,
- "An object with this ID already exists in the plasma "
- "store.");
- return NULL;
- }
- if (s.IsPlasmaStoreFull()) {
- PyErr_SetString(PlasmaOutOfMemoryError,
- "The plasma store ran out of memory and could not create "
- "this object.");
- return NULL;
- }
- ARROW_CHECK(s.ok());
-
-#if PY_MAJOR_VERSION >= 3
- return PyMemoryView_FromMemory(reinterpret_cast<char*>(data), size, PyBUF_WRITE);
-#else
- return PyBuffer_FromReadWriteMemory(reinterpret_cast<void*>(data), size);
-#endif
-}
-
-PyObject* PyPlasma_hash(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
- &object_id)) {
- return NULL;
- }
- unsigned char digest[kDigestSize];
- bool success = plasma_compute_object_hash(client, object_id, digest);
- if (success) {
- PyObject* digest_string =
- PyBytes_FromStringAndSize(reinterpret_cast<char*>(digest), kDigestSize);
- return digest_string;
- } else {
- Py_RETURN_NONE;
- }
-}
-
-PyObject* PyPlasma_seal(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
- &object_id)) {
- return NULL;
- }
- ARROW_CHECK_OK(client->Seal(object_id));
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_release(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
- &object_id)) {
- return NULL;
- }
- ARROW_CHECK_OK(client->Release(object_id));
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_get(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- PyObject* object_id_list;
- Py_ssize_t timeout_ms;
- if (!PyArg_ParseTuple(
- args, "O&On", PyObjectToPlasmaClient, &client, &object_id_list, &timeout_ms)) {
- return NULL;
- }
-
- Py_ssize_t num_object_ids = PyList_Size(object_id_list);
- std::vector<ObjectID> object_ids(num_object_ids);
- std::vector<ObjectBuffer> object_buffers(num_object_ids);
-
- for (int i = 0; i < num_object_ids; ++i) {
- PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
- }
-
- Py_BEGIN_ALLOW_THREADS;
- ARROW_CHECK_OK(
- client->Get(object_ids.data(), num_object_ids, timeout_ms, object_buffers.data()));
- Py_END_ALLOW_THREADS;
-
- PyObject* returns = PyList_New(num_object_ids);
- for (int i = 0; i < num_object_ids; ++i) {
- if (object_buffers[i].data_size != -1) {
- /* The object was retrieved, so return the object. */
- PyObject* t = PyTuple_New(2);
- Py_ssize_t data_size = static_cast<Py_ssize_t>(object_buffers[i].data_size);
- Py_ssize_t metadata_size = static_cast<Py_ssize_t>(object_buffers[i].metadata_size);
-#if PY_MAJOR_VERSION >= 3
- char* data = reinterpret_cast<char*>(object_buffers[i].data);
- char* metadata = reinterpret_cast<char*>(object_buffers[i].metadata);
- PyTuple_SET_ITEM(t, 0, PyMemoryView_FromMemory(data, data_size, PyBUF_READ));
- PyTuple_SET_ITEM(
- t, 1, PyMemoryView_FromMemory(metadata, metadata_size, PyBUF_READ));
-#else
- void* data = reinterpret_cast<void*>(object_buffers[i].data);
- void* metadata = reinterpret_cast<void*>(object_buffers[i].metadata);
- PyTuple_SET_ITEM(t, 0, PyBuffer_FromMemory(data, data_size));
- PyTuple_SET_ITEM(t, 1, PyBuffer_FromMemory(metadata, metadata_size));
-#endif
- ARROW_CHECK(PyList_SetItem(returns, i, t) == 0);
- } else {
- /* The object was not retrieved, so just add None to the list of return
- * values. */
- Py_INCREF(Py_None);
- ARROW_CHECK(PyList_SetItem(returns, i, Py_None) == 0);
- }
- }
- return returns;
-}
-
-PyObject* PyPlasma_contains(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
- &object_id)) {
- return NULL;
- }
- bool has_object;
- ARROW_CHECK_OK(client->Contains(object_id, &has_object));
-
- if (has_object) {
- Py_RETURN_TRUE;
- } else {
- Py_RETURN_FALSE;
- }
-}
-
-PyObject* PyPlasma_fetch(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- PyObject* object_id_list;
- if (!PyArg_ParseTuple(args, "O&O", PyObjectToPlasmaClient, &client, &object_id_list)) {
- return NULL;
- }
- if (client->get_manager_fd() == -1) {
- PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
- return NULL;
- }
- Py_ssize_t n = PyList_Size(object_id_list);
- ObjectID* object_ids = new ObjectID[n];
- for (int i = 0; i < n; ++i) {
- PyStringToUniqueID(PyList_GetItem(object_id_list, i), &object_ids[i]);
- }
- ARROW_CHECK_OK(client->Fetch(static_cast<int>(n), object_ids));
- delete[] object_ids;
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_wait(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- PyObject* object_id_list;
- Py_ssize_t timeout;
- int num_returns;
- if (!PyArg_ParseTuple(args, "O&Oni", PyObjectToPlasmaClient, &client, &object_id_list,
- &timeout, &num_returns)) {
- return NULL;
- }
- Py_ssize_t n = PyList_Size(object_id_list);
-
- if (client->get_manager_fd() == -1) {
- PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
- return NULL;
- }
- if (num_returns < 0) {
- PyErr_SetString(
- PyExc_RuntimeError, "The argument num_returns cannot be less than zero.");
- return NULL;
- }
- if (num_returns > n) {
- PyErr_SetString(PyExc_RuntimeError,
- "The argument num_returns cannot be greater than len(object_ids)");
- return NULL;
- }
- int64_t threshold = 1 << 30;
- if (timeout > threshold) {
- PyErr_SetString(
- PyExc_RuntimeError, "The argument timeout cannot be greater than 2 ** 30.");
- return NULL;
- }
-
- std::vector<ObjectRequest> object_requests(n);
- for (int i = 0; i < n; ++i) {
- ARROW_CHECK(PyStringToUniqueID(PyList_GetItem(object_id_list, i),
- &object_requests[i].object_id) == 1);
- object_requests[i].type = PLASMA_QUERY_ANYWHERE;
- }
- /* Drop the global interpreter lock while we are waiting, so other threads can
- * run. */
- int num_return_objects;
- Py_BEGIN_ALLOW_THREADS;
- ARROW_CHECK_OK(
- client->Wait(n, object_requests.data(), num_returns, timeout, &num_return_objects));
- Py_END_ALLOW_THREADS;
-
- int num_to_return = std::min(num_return_objects, num_returns);
- PyObject* ready_ids = PyList_New(num_to_return);
- PyObject* waiting_ids = PySet_New(object_id_list);
- int num_returned = 0;
- for (int i = 0; i < n; ++i) {
- if (num_returned == num_to_return) { break; }
- if (object_requests[i].status == ObjectStatus_Local ||
- object_requests[i].status == ObjectStatus_Remote) {
- PyObject* ready = PyBytes_FromStringAndSize(
- reinterpret_cast<char*>(&object_requests[i].object_id),
- sizeof(object_requests[i].object_id));
- PyList_SetItem(ready_ids, num_returned, ready);
- PySet_Discard(waiting_ids, ready);
- num_returned += 1;
- } else {
- ARROW_CHECK(object_requests[i].status == ObjectStatus_Nonexistent);
- }
- }
- ARROW_CHECK(num_returned == num_to_return);
- /* Return both the ready IDs and the remaining IDs. */
- PyObject* t = PyTuple_New(2);
- PyTuple_SetItem(t, 0, ready_ids);
- PyTuple_SetItem(t, 1, waiting_ids);
- return t;
-}
-
-PyObject* PyPlasma_evict(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- Py_ssize_t num_bytes;
- if (!PyArg_ParseTuple(args, "O&n", PyObjectToPlasmaClient, &client, &num_bytes)) {
- return NULL;
- }
- int64_t evicted_bytes;
- ARROW_CHECK_OK(client->Evict(static_cast<int64_t>(num_bytes), evicted_bytes));
- return PyLong_FromSsize_t(static_cast<Py_ssize_t>(evicted_bytes));
-}
-
-PyObject* PyPlasma_delete(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- if (!PyArg_ParseTuple(args, "O&O&", PyObjectToPlasmaClient, &client, PyStringToUniqueID,
- &object_id)) {
- return NULL;
- }
- ARROW_CHECK_OK(client->Delete(object_id));
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_transfer(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- ObjectID object_id;
- const char* addr;
- int port;
- if (!PyArg_ParseTuple(args, "O&O&si", PyObjectToPlasmaClient, &client,
- PyStringToUniqueID, &object_id, &addr, &port)) {
- return NULL;
- }
-
- if (client->get_manager_fd() == -1) {
- PyErr_SetString(PyExc_RuntimeError, "Not connected to the plasma manager");
- return NULL;
- }
-
- ARROW_CHECK_OK(client->Transfer(addr, port, object_id));
- Py_RETURN_NONE;
-}
-
-PyObject* PyPlasma_subscribe(PyObject* self, PyObject* args) {
- PlasmaClient* client;
- if (!PyArg_ParseTuple(args, "O&", PyObjectToPlasmaClient, &client)) { return NULL; }
-
- int sock;
- ARROW_CHECK_OK(client->Subscribe(&sock));
- return PyLong_FromLong(sock);
-}
-
-PyObject* PyPlasma_receive_notification(PyObject* self, PyObject* args) {
- int plasma_sock;
-
- if (!PyArg_ParseTuple(args, "i", &plasma_sock)) { return NULL; }
- /* Receive object notification from the plasma connection socket. If the
- * object was added, return a tuple of its fields: ObjectID, data_size,
- * metadata_size. If the object was deleted, data_size and metadata_size will
- * be set to -1. */
- uint8_t* notification = read_message_async(plasma_sock);
- if (notification == NULL) {
- PyErr_SetString(
- PyExc_RuntimeError, "Failed to read object notification from Plasma socket");
- return NULL;
- }
- auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification);
- /* Construct a tuple from object_info and return. */
- PyObject* t = PyTuple_New(3);
- PyTuple_SetItem(t, 0, PyBytes_FromStringAndSize(object_info->object_id()->data(),
- object_info->object_id()->size()));
- if (object_info->is_deletion()) {
- PyTuple_SetItem(t, 1, PyLong_FromLong(-1));
- PyTuple_SetItem(t, 2, PyLong_FromLong(-1));
- } else {
- PyTuple_SetItem(t, 1, PyLong_FromLong(object_info->data_size()));
- PyTuple_SetItem(t, 2, PyLong_FromLong(object_info->metadata_size()));
- }
-
- delete[] notification;
- return t;
-}
-
-static PyMethodDef plasma_methods[] = {
- {"connect", PyPlasma_connect, METH_VARARGS, "Connect to plasma."},
- {"disconnect", PyPlasma_disconnect, METH_VARARGS, "Disconnect from plasma."},
- {"create", PyPlasma_create, METH_VARARGS, "Create a new plasma object."},
- {"hash", PyPlasma_hash, METH_VARARGS, "Compute the hash of a plasma object."},
- {"seal", PyPlasma_seal, METH_VARARGS, "Seal a plasma object."},
- {"get", PyPlasma_get, METH_VARARGS, "Get a plasma object."},
- {"contains", PyPlasma_contains, METH_VARARGS,
- "Does the plasma store contain this plasma object?"},
- {"fetch", PyPlasma_fetch, METH_VARARGS,
- "Fetch the object from another plasma manager instance."},
- {"wait", PyPlasma_wait, METH_VARARGS,
- "Wait until num_returns objects in object_ids are ready."},
- {"evict", PyPlasma_evict, METH_VARARGS,
- "Evict some objects until we recover some number of bytes."},
- {"release", PyPlasma_release, METH_VARARGS, "Release the plasma object."},
- {"delete", PyPlasma_delete, METH_VARARGS, "Delete a plasma object."},
- {"transfer", PyPlasma_transfer, METH_VARARGS,
- "Transfer object to another plasma manager."},
- {"subscribe", PyPlasma_subscribe, METH_VARARGS,
- "Subscribe to the plasma notification socket."},
- {"receive_notification", PyPlasma_receive_notification, METH_VARARGS,
- "Receive next notification from plasma notification socket."},
- {NULL} /* Sentinel */
-};
-
-#if PY_MAJOR_VERSION >= 3
-static struct PyModuleDef moduledef = {
- PyModuleDef_HEAD_INIT, "libplasma", /* m_name */
- "A Python client library for plasma.", /* m_doc */
- 0, /* m_size */
- plasma_methods, /* m_methods */
- NULL, /* m_reload */
- NULL, /* m_traverse */
- NULL, /* m_clear */
- NULL, /* m_free */
-};
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define INITERROR return NULL
-#else
-#define INITERROR return
-#endif
-
-#ifndef PyMODINIT_FUNC /* declarations for DLL import/export */
-#define PyMODINIT_FUNC void
-#endif
-
-#if PY_MAJOR_VERSION >= 3
-#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
-#else
-#define MOD_INIT(name) PyMODINIT_FUNC init##name(void)
-#endif
-
-MOD_INIT(libplasma) {
-#if PY_MAJOR_VERSION >= 3
- PyObject* m = PyModule_Create(&moduledef);
-#else
- PyObject* m =
- Py_InitModule3("libplasma", plasma_methods, "A Python client library for plasma.");
-#endif
-
- /* Create a custom exception for when an object ID is reused. */
- char plasma_object_exists_error[] = "plasma_object_exists.error";
- PlasmaObjectExistsError = PyErr_NewException(plasma_object_exists_error, NULL, NULL);
- Py_INCREF(PlasmaObjectExistsError);
- PyModule_AddObject(m, "plasma_object_exists_error", PlasmaObjectExistsError);
- /* Create a custom exception for when the plasma store is out of memory. */
- char plasma_out_of_memory_error[] = "plasma_out_of_memory.error";
- PlasmaOutOfMemoryError = PyErr_NewException(plasma_out_of_memory_error, NULL, NULL);
- Py_INCREF(PlasmaOutOfMemoryError);
- PyModule_AddObject(m, "plasma_out_of_memory_error", PlasmaOutOfMemoryError);
-
-#if PY_MAJOR_VERSION >= 3
- return m;
-#endif
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/extension.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/extension.h b/cpp/src/plasma/extension.h
deleted file mode 100644
index cee30ab..0000000
--- a/cpp/src/plasma/extension.h
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_EXTENSION_H
-#define PLASMA_EXTENSION_H
-
-#undef _XOPEN_SOURCE
-#undef _POSIX_C_SOURCE
-#include <Python.h>
-
-#include "bytesobject.h" // NOLINT
-
-#include "plasma/client.h"
-#include "plasma/common.h"
-
-static int PyObjectToPlasmaClient(PyObject* object, PlasmaClient** client) {
- if (PyCapsule_IsValid(object, "plasma")) {
- *client = reinterpret_cast<PlasmaClient*>(PyCapsule_GetPointer(object, "plasma"));
- return 1;
- } else {
- PyErr_SetString(PyExc_TypeError, "must be a 'plasma' capsule");
- return 0;
- }
-}
-
-int PyStringToUniqueID(PyObject* object, ObjectID* object_id) {
- if (PyBytes_Check(object)) {
- memcpy(object_id, PyBytes_AsString(object), sizeof(ObjectID));
- return 1;
- } else {
- PyErr_SetString(PyExc_TypeError, "must be a 20 character string");
- return 0;
- }
-}
-
-#endif // PLASMA_EXTENSION_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index 559d8e7..bfed500 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -24,6 +24,8 @@
#include "plasma/common.h"
#include "plasma/protocol.h"
+namespace plasma {
+
int warn_if_sigpipe(int status, int client_sock) {
if (status >= 0) { return 0; }
if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
@@ -62,3 +64,5 @@ ObjectTableEntry* get_object_table_entry(
if (it == store_info->objects.end()) { return NULL; }
return it->second.get();
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 275d0c7..db8669f 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -32,8 +32,10 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"
-#include "format/common_generated.h"
#include "plasma/common.h"
+#include "plasma/common_generated.h"
+
+namespace plasma {
#define HANDLE_SIGPIPE(s, fd_) \
do { \
@@ -54,47 +56,23 @@
/// Allocation granularity used in plasma for object allocation.
#define BLOCK_SIZE 64
-/// Size of object hash digests.
-constexpr int64_t kDigestSize = sizeof(uint64_t);
-
struct Client;
-/// Object request data structure. Used in the plasma_wait_for_objects()
-/// argument.
-typedef struct {
- /// The ID of the requested object. If ID_NIL request any object.
- ObjectID object_id;
- /// Request associated to the object. It can take one of the following values:
- /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the
- /// local Plasma Store.
- /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
- /// the system (i.e., either in the local or a remote Plasma Store).
- int type;
- /// Object status. Same as the status returned by plasma_status() function
- /// call. This is filled in by plasma_wait_for_objects1():
- /// - ObjectStatus_Local: object is ready at the local Plasma Store.
- /// - ObjectStatus_Remote: object is ready at a remote Plasma Store.
- /// - ObjectStatus_Nonexistent: object does not exist in the system.
- /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
- /// for being transferred or it is transferring.
- int status;
-} ObjectRequest;
-
/// Mapping from object IDs to type and status of the request.
typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
/// Handle to access memory mapped file and map it into client address space.
-typedef struct {
+struct object_handle {
/// The file descriptor of the memory mapped file in the store. It is used as
/// a unique identifier of the file in the client to look up the corresponding
/// file descriptor on the client's side.
int store_fd;
/// The size in bytes of the memory mapped file.
int64_t mmap_size;
-} object_handle;
+};
// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-typedef struct {
+struct PlasmaObject {
/// Handle for memory mapped file the object is stored in.
object_handle handle;
/// The offset in bytes in the memory mapped file of the data.
@@ -105,28 +83,21 @@ typedef struct {
int64_t data_size;
/// The size in bytes of the metadata.
int64_t metadata_size;
-} PlasmaObject;
+};
-typedef enum {
+enum object_state {
/// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1,
/// Object is sealed and stored in the local Plasma Store.
PLASMA_SEALED
-} object_state;
+};
-typedef enum {
+enum object_status {
/// The object was not found.
OBJECT_NOT_FOUND = 0,
/// The object was found.
OBJECT_FOUND = 1
-} object_status;
-
-typedef enum {
- /// Query for object in the local plasma store.
- PLASMA_QUERY_LOCAL = 1,
- /// Query for object in the local plasma store or in a remote plasma store.
- PLASMA_QUERY_ANYWHERE
-} object_request_type;
+};
/// This type is used by the Plasma store. It is here because it is exposed to
/// the eviction policy.
@@ -188,4 +159,6 @@ int warn_if_sigpipe(int status, int client_sock);
uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
+} // namespace plasma
+
#endif // PLASMA_PLASMA_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/plasma.pc.in
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.pc.in b/cpp/src/plasma/plasma.pc.in
new file mode 100644
index 0000000..d868689
--- /dev/null
+++ b/cpp/src/plasma/plasma.pc.in
@@ -0,0 +1,30 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+prefix=@CMAKE_INSTALL_PREFIX@
+libdir=${prefix}/@CMAKE_INSTALL_LIBDIR@
+includedir=${prefix}/include
+
+so_version=@PLASMA_SO_VERSION@
+abi_version=@PLASMA_ABI_VERSION@
+executable=${prefix}/@CMAKE_INSTALL_BINDIR@/plasma_store
+
+Name: Plasma
+Description: Plasma is an in-memory object store and cache for big data.
+Version: @PLASMA_VERSION@
+Libs: -L${libdir} -lplasma
+Cflags: -I${includedir}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 246aa29..2998c68 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -18,11 +18,13 @@
#include "plasma/protocol.h"
#include "flatbuffers/flatbuffers.h"
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
#include "plasma/common.h"
#include "plasma/io.h"
+namespace plasma {
+
using flatbuffers::uoffset_t;
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
@@ -500,3 +502,5 @@ Status ReadDataReply(
*metadata_size = (int64_t)message->metadata_size();
return Status::OK();
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 5d9d136..835c5a0 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -21,9 +21,11 @@
#include <vector>
#include "arrow/status.h"
-#include "format/plasma_generated.h"
+#include "plasma/plasma_generated.h"
#include "plasma/plasma.h"
+namespace plasma {
+
using arrow::Status;
/* Plasma receive message. */
@@ -167,4 +169,6 @@ Status SendDataReply(
Status ReadDataReply(
uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size);
+} // namespace plasma
+
#endif /* PLASMA_PROTOCOL */
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 9394e3d..8d4fb10 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -49,12 +49,14 @@
#include <unordered_set>
#include <vector>
-#include "format/common_generated.h"
+#include "plasma/common_generated.h"
#include "plasma/common.h"
#include "plasma/fling.h"
#include "plasma/io.h"
#include "plasma/malloc.h"
+namespace plasma {
+
extern "C" {
void* dlmalloc(size_t bytes);
void* dlmemalign(size_t alignment, size_t bytes);
@@ -625,8 +627,10 @@ void start_server(char* socket_name, int64_t system_memory) {
loop.run();
}
+} // namespace plasma
+
int main(int argc, char* argv[]) {
- signal(SIGTERM, signal_handler);
+ signal(SIGTERM, plasma::signal_handler);
char* socket_name = NULL;
int64_t system_memory = -1;
int c;
@@ -677,7 +681,7 @@ int main(int argc, char* argv[]) {
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
- dlmalloc_set_footprint_limit((size_t)system_memory);
+ plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
- start_server(socket_name, system_memory);
+ plasma::start_server(socket_name, system_memory);
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index 8bd9426..27c3813 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -27,6 +27,8 @@
#include "plasma/plasma.h"
#include "plasma/protocol.h"
+namespace plasma {
+
struct GetRequest;
struct NotificationQueue {
@@ -166,4 +168,6 @@ class PlasmaStore {
std::unordered_map<int, NotificationQueue> pending_notifications_;
};
+} // namespace plasma
+
#endif // PLASMA_STORE_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/test/client_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 29b5b13..6dc558e 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -29,7 +29,9 @@
#include "plasma/plasma.h"
#include "plasma/protocol.h"
-std::string g_test_executable; // NOLINT
+namespace plasma {
+
+std::string test_executable; // NOLINT
class TestPlasmaStore : public ::testing::Test {
public:
@@ -37,7 +39,7 @@ class TestPlasmaStore : public ::testing::Test {
// stdout of the object store. Consider changing that.
void SetUp() {
std::string plasma_directory =
- g_test_executable.substr(0, g_test_executable.find_last_of("/"));
+ test_executable.substr(0, test_executable.find_last_of("/"));
std::string plasma_command =
plasma_directory +
"/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
@@ -125,8 +127,10 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
ASSERT_EQ(object_buffer[1].data[0], 2);
}
+} // namespace plasma
+
int main(int argc, char** argv) {
::testing::InitGoogleTest(&argc, argv);
- g_test_executable = std::string(argv[0]);
+ plasma::test_executable = std::string(argv[0]);
return RUN_ALL_TESTS();
}
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/cpp/src/plasma/test/serialization_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 325cead..13938cd 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -25,6 +25,8 @@
#include "plasma/plasma.h"
#include "plasma/protocol.h"
+namespace plasma {
+
/**
* Create a temporary file. Needs to be closed by the caller.
*
@@ -386,3 +388,5 @@ TEST(PlasmaSerialization, DataReply) {
ASSERT_EQ(object_size1, object_size2);
ASSERT_EQ(metadata_size1, metadata_size2);
}
+
+} // namespace plasma
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index 224147d..6ff6646 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -220,6 +220,12 @@ include_directories(SYSTEM
find_package(Arrow REQUIRED)
include_directories(SYSTEM ${ARROW_INCLUDE_DIR})
+## Plasma
+find_package(Plasma)
+if (PLASMA_FOUND)
+ include_directories(SYSTEM ${PLASMA_INCLUDE_DIR})
+endif()
+
function(bundle_arrow_lib library_path)
get_filename_component(LIBRARY_DIR ${${library_path}} DIRECTORY)
get_filename_component(LIBRARY_NAME ${${library_path}} NAME_WE)
@@ -252,6 +258,9 @@ if (PYARROW_BUNDLE_ARROW_CPP)
file(COPY ${ARROW_INCLUDE_DIR}/arrow DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY}/include)
bundle_arrow_lib(ARROW_SHARED_LIB)
bundle_arrow_lib(ARROW_PYTHON_SHARED_LIB)
+ if (PLASMA_FOUND)
+ bundle_arrow_lib(PLASMA_SHARED_LIB)
+ endif()
endif()
if (MSVC)
@@ -278,9 +287,14 @@ set(CYTHON_EXTENSIONS
lib
)
+if (PLASMA_FOUND)
+ set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma)
+endif()
+
set(LINK_LIBS
arrow_shared
arrow_python_shared
+ ${PLASMA_SHARED_LIB}
)
if (PYARROW_BUILD_PARQUET)
@@ -379,3 +393,7 @@ foreach(module ${CYTHON_EXTENSIONS})
target_link_libraries(${module_name} ${LINK_LIBS})
endforeach(module)
+
+if (PLASMA_FOUND)
+ file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY})
+endif()
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/cmake_modules/FindPlasma.cmake
----------------------------------------------------------------------
diff --git a/python/cmake_modules/FindPlasma.cmake b/python/cmake_modules/FindPlasma.cmake
new file mode 100644
index 0000000..3acaa34
--- /dev/null
+++ b/python/cmake_modules/FindPlasma.cmake
@@ -0,0 +1,99 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# - Find PLASMA (plasma/client.h, libplasma.a, libplasma.so)
+# This module defines
+# PLASMA_INCLUDE_DIR, directory containing headers
+# PLASMA_LIBS, directory containing plasma libraries
+# PLASMA_STATIC_LIB, path to libplasma.a
+# PLASMA_SHARED_LIB, path to libplasma's shared library
+# PLASMA_SHARED_IMP_LIB, path to libplasma's import library (MSVC only)
+# PLASMA_FOUND, whether plasma has been found
+
+include(FindPkgConfig)
+
+if ("$ENV{ARROW_HOME}" STREQUAL "")
+ pkg_check_modules(PLASMA plasma)
+ if (PLASMA_FOUND)
+ pkg_get_variable(PLASMA_EXECUTABLE plasma executable)
+ pkg_get_variable(PLASMA_ABI_VERSION plasma abi_version)
+ message(STATUS "Plasma ABI version: ${PLASMA_ABI_VERSION}")
+ pkg_get_variable(PLASMA_SO_VERSION plasma so_version)
+ message(STATUS "Plasma SO version: ${PLASMA_SO_VERSION}")
+ set(PLASMA_INCLUDE_DIR ${PLASMA_INCLUDE_DIRS})
+ set(PLASMA_LIBS ${PLASMA_LIBRARY_DIRS})
+ set(PLASMA_SEARCH_LIB_PATH ${PLASMA_LIBRARY_DIRS})
+ endif()
+else()
+ set(PLASMA_HOME "$ENV{ARROW_HOME}")
+
+ set(PLASMA_EXECUTABLE ${PLASMA_HOME}/bin/plasma_store)
+
+ set(PLASMA_SEARCH_HEADER_PATHS
+ ${PLASMA_HOME}/include
+ )
+
+ set(PLASMA_SEARCH_LIB_PATH
+ ${PLASMA_HOME}/lib
+ )
+
+ find_path(PLASMA_INCLUDE_DIR plasma/client.h PATHS
+ ${PLASMA_SEARCH_HEADER_PATHS}
+ # make sure we don't accidentally pick up a different version
+ NO_DEFAULT_PATH
+ )
+endif()
+
+find_library(PLASMA_LIB_PATH NAMES plasma
+ PATHS
+ ${PLASMA_SEARCH_LIB_PATH}
+ NO_DEFAULT_PATH)
+get_filename_component(PLASMA_LIBS ${PLASMA_LIB_PATH} DIRECTORY)
+
+if (PLASMA_INCLUDE_DIR AND PLASMA_LIBS)
+ set(PLASMA_FOUND TRUE)
+ set(PLASMA_LIB_NAME plasma)
+
+ set(PLASMA_STATIC_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}.a)
+
+ set(PLASMA_SHARED_LIB ${PLASMA_LIBS}/lib${PLASMA_LIB_NAME}${CMAKE_SHARED_LIBRARY_SUFFIX})
+endif()
+
+if (PLASMA_FOUND)
+ if (NOT Plasma_FIND_QUIETLY)
+ message(STATUS "Found the Plasma core library: ${PLASMA_LIB_PATH}")
+ message(STATUS "Found Plasma executable: ${PLASMA_EXECUTABLE}")
+ endif ()
+else ()
+ if (NOT Plasma_FIND_QUIETLY)
+ set(PLASMA_ERR_MSG "Could not find the Plasma library. Looked for headers")
+ set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_HEADER_PATHS}, and for libs")
+ set(PLASMA_ERR_MSG "${PLASMA_ERR_MSG} in ${PLASMA_SEARCH_LIB_PATH}")
+ if (Plasma_FIND_REQUIRED)
+ message(FATAL_ERROR "${PLASMA_ERR_MSG}")
+ else (Plasma_FIND_REQUIRED)
+ message(STATUS "${PLASMA_ERR_MSG}")
+ endif (Plasma_FIND_REQUIRED)
+ endif ()
+ set(PLASMA_FOUND FALSE)
+endif ()
+
+mark_as_advanced(
+ PLASMA_INCLUDE_DIR
+ PLASMA_STATIC_LIB
+ PLASMA_SHARED_LIB
+)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index c52d400..780aa48 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -212,6 +212,21 @@ Type Classes
Field
Schema
+.. currentmodule:: pyarrow.plasma
+
+.. _api.plasma:
+
+In-Memory Object Store
+----------------------
+
+.. autosummary::
+ :toctree: generated/
+
+ ObjectID
+ PlasmaClient
+ PlasmaBuffer
+ MutablePlasmaBuffer
+
.. currentmodule:: pyarrow.parquet
.. _api.parquet:
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/manylinux1/build_arrow.sh
----------------------------------------------------------------------
diff --git a/python/manylinux1/build_arrow.sh b/python/manylinux1/build_arrow.sh
index 8c6bda9..85c096a 100755
--- a/python/manylinux1/build_arrow.sh
+++ b/python/manylinux1/build_arrow.sh
@@ -35,6 +35,7 @@ cd /arrow/python
# PyArrow build configuration
export PYARROW_BUILD_TYPE='release'
export PYARROW_WITH_PARQUET=1
+export PYARROW_WITH_PLASMA=1
export PYARROW_BUNDLE_ARROW_CPP=1
# Need as otherwise arrow_io is sometimes not linked
export LDFLAGS="-Wl,--no-as-needed"
@@ -52,7 +53,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do
ARROW_BUILD_DIR=/arrow/cpp/build-PY${PYTHON}
mkdir -p "${ARROW_BUILD_DIR}"
pushd "${ARROW_BUILD_DIR}"
- PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} ..
+ PATH="$(cpython_path $PYTHON)/bin:$PATH" cmake -DCMAKE_BUILD_TYPE=Release -DCMAKE_INSTALL_PREFIX=/arrow-dist -DARROW_BUILD_TESTS=OFF -DARROW_BUILD_SHARED=ON -DARROW_BOOST_USE_SHARED=OFF -DARROW_JEMALLOC=ON -DARROW_RPATH_ORIGIN=ON -DARROW_JEMALLOC_USE_SHARED=OFF -DARROW_PYTHON=ON -DPythonInterp_FIND_VERSION=${PYTHON} -DARROW_PLASMA=ON ..
make -j5 install
popd
@@ -65,6 +66,7 @@ for PYTHON in ${PYTHON_VERSIONS}; do
echo "=== (${PYTHON}) Test the existence of optional modules ==="
$PIPI_IO -r requirements.txt
PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.parquet"
+ PATH="$PATH:$(cpython_path $PYTHON)/bin" $PYTHON_INTERPRETER -c "import pyarrow.plasma"
echo "=== (${PYTHON}) Tag the wheel with manylinux1 ==="
mkdir -p repaired_wheels/
@@ -78,4 +80,3 @@ for PYTHON in ${PYTHON_VERSIONS}; do
mv repaired_wheels/*.whl /io/dist
done
-
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index e3d783a..6d0ce20 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -68,6 +68,7 @@ from pyarrow.lib import (null, bool_,
Date32Value, Date64Value, TimestampValue)
from pyarrow.lib import (HdfsFile, NativeFile, PythonFile,
+ FixedSizeBufferOutputStream,
Buffer, BufferReader, BufferOutputStream,
OSFile, MemoryMappedFile, memory_map,
frombuffer,
@@ -99,7 +100,6 @@ from pyarrow.ipc import (Message, MessageReader,
open_file,
serialize_pandas, deserialize_pandas)
-
localfs = LocalFilesystem.get_instance()
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/error.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/error.pxi b/python/pyarrow/error.pxi
index 259aeb0..8a3f57d 100644
--- a/python/pyarrow/error.pxi
+++ b/python/pyarrow/error.pxi
@@ -48,6 +48,18 @@ class ArrowNotImplementedError(NotImplementedError, ArrowException):
pass
+class PlasmaObjectExists(ArrowException):
+ pass
+
+
+class PlasmaObjectNonexistent(ArrowException):
+ pass
+
+
+class PlasmaStoreFull(ArrowException):
+ pass
+
+
cdef int check_status(const CStatus& status) nogil except -1:
if status.ok():
return 0
@@ -66,5 +78,11 @@ cdef int check_status(const CStatus& status) nogil except -1:
raise ArrowNotImplementedError(message)
elif status.IsTypeError():
raise ArrowTypeError(message)
+ elif status.IsPlasmaObjectExists():
+ raise PlasmaObjectExists(message)
+ elif status.IsPlasmaObjectNonexistent():
+ raise PlasmaObjectNonexistent(message)
+ elif status.IsPlasmaStoreFull():
+ raise PlasmaStoreFull(message)
else:
raise ArrowException(message)
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/includes/common.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/common.pxd b/python/pyarrow/includes/common.pxd
index 3487d48..637a133 100644
--- a/python/pyarrow/includes/common.pxd
+++ b/python/pyarrow/includes/common.pxd
@@ -50,6 +50,9 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
c_bool IsKeyError()
c_bool IsNotImplemented()
c_bool IsTypeError()
+ c_bool IsPlasmaObjectExists()
+ c_bool IsPlasmaObjectNonexistent()
+ c_bool IsPlasmaStoreFull()
cdef inline object PyObject_to_object(PyObject* o):
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/includes/libarrow.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index edf50ad..ffe867b 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -148,9 +148,15 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
CLoggingMemoryPool(CMemoryPool*)
cdef cppclass CBuffer" arrow::Buffer":
+ CBuffer(const uint8_t* data, int64_t size)
uint8_t* data()
int64_t size()
shared_ptr[CBuffer] parent()
+ c_bool is_mutable() const
+
+ cdef cppclass CMutableBuffer" arrow::MutableBuffer"(CBuffer):
+ CMutableBuffer(const uint8_t* data, int64_t size)
+ uint8_t* mutable_data()
cdef cppclass ResizableBuffer(CBuffer):
CStatus Resize(int64_t nbytes)
@@ -558,6 +564,9 @@ cdef extern from "arrow/io/memory.h" namespace "arrow::io" nogil:
CMockOutputStream()
int64_t GetExtentBytesWritten()
+ cdef cppclass CFixedSizeBufferWriter" arrow::io::FixedSizeBufferWriter"(WriteableFile):
+ CFixedSizeBufferWriter(const shared_ptr[CBuffer]& buffer)
+
cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
enum MessageType" arrow::ipc::Message::Type":
http://git-wip-us.apache.org/repos/asf/arrow/blob/a94f4716/python/pyarrow/io.pxi
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi
index 8b213a3..181b0b1 100644
--- a/python/pyarrow/io.pxi
+++ b/python/pyarrow/io.pxi
@@ -473,6 +473,15 @@ cdef class OSFile(NativeFile):
self.wr_file = <shared_ptr[OutputStream]> handle
+cdef class FixedSizeBufferOutputStream(NativeFile):
+
+ def __cinit__(self, Buffer buffer):
+ self.wr_file.reset(new CFixedSizeBufferWriter(buffer.buffer))
+ self.is_readable = 0
+ self.is_writeable = 1
+ self.is_open = True
+
+
# ----------------------------------------------------------------------
# Arrow buffers
@@ -523,7 +532,10 @@ cdef class Buffer:
buffer.len = self.size
buffer.ndim = 1
buffer.obj = self
- buffer.readonly = 1
+ if self.buffer.get().is_mutable():
+ buffer.readonly = 0
+ else:
+ buffer.readonly = 1
buffer.shape = self.shape
buffer.strides = self.strides
buffer.suboffsets = NULL
@@ -540,6 +552,15 @@ cdef class Buffer:
p[0] = <void*> self.buffer.get().data()
return self.size
+ def __getwritebuffer__(self, Py_ssize_t idx, void **p):
+ if not self.buffer.get().is_mutable():
+ raise SystemError("trying to write an immutable buffer")
+ if idx != 0:
+ raise SystemError("accessing non-existent buffer segment")
+ if p != NULL:
+ p[0] = <void*> self.buffer.get().data()
+ return self.size
+
cdef shared_ptr[PoolBuffer] allocate_buffer(CMemoryPool* pool):
cdef shared_ptr[PoolBuffer] result