You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/05/14 23:57:08 UTC
[arrow] branch master updated: ARROW-2577: [Plasma] Add asv
benchmarks for plasma
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 75acaba ARROW-2577: [Plasma] Add asv benchmarks for plasma
75acaba is described below
commit 75acaba67d7e2ead5ccc51e42396e916b44db3f9
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Mon May 14 16:56:58 2018 -0700
ARROW-2577: [Plasma] Add asv benchmarks for plasma
This adds some initial ASV benchmarks for plasma:
- Put latency
- Get latency
- Put throughput for 1KB, 10KB, 100KB, 1MB, 10MB, 100MB
It also includes some minor code restructuring to expose the start_plasma_store method.
Author: Philipp Moritz <pc...@gmail.com>
Closes #2038 from pcmoritz/plasma-asv and squashes the following commits:
34a06845 <Philipp Moritz> measure wallclock time instead of process cpu time
c89256f7 <Philipp Moritz> parametrize tests
3567ddc7 <Philipp Moritz> fix windows build
eca17675 <Philipp Moritz> build plasma in asv
47671b34 <Philipp Moritz> fix
1261177e <Philipp Moritz> fix linting errors
7d4d6854 <Philipp Moritz> Add asv benchmarks for plasma
---
python/CMakeLists.txt | 2 +-
python/asv-build.sh | 2 +
python/benchmarks/plasma.py | 68 +++++++++++++++++++++
python/pyarrow/{plasma.pyx => _plasma.pyx} | 0
python/pyarrow/plasma.py | 97 ++++++++++++++++++++++++++++++
python/pyarrow/tests/test_plasma.py | 88 +++------------------------
python/setup.py | 4 +-
7 files changed, 179 insertions(+), 82 deletions(-)
diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index fcc1d3c..99194c2 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -404,7 +404,7 @@ if (PYARROW_BUILD_PLASMA)
${LINK_LIBS}
libplasma_shared)
- set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma)
+ set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _plasma)
file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY})
endif()
diff --git a/python/asv-build.sh b/python/asv-build.sh
index 31e56ed..2bbc94b 100755
--- a/python/asv-build.sh
+++ b/python/asv-build.sh
@@ -37,6 +37,7 @@ cmake -GNinja \
-DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
-DARROW_CXXFLAGS=$CXXFLAGS \
-DARROW_PYTHON=ON \
+ -DARROW_PLASMA=ON \
-DARROW_BUILD_TESTS=OFF \
..
cmake --build . --target install
@@ -47,6 +48,7 @@ popd
# Build pyarrow wrappers
export SETUPTOOLS_SCM_PRETEND_VERSION=0.0.1
export PYARROW_BUILD_TYPE=release
+export PYARROW_WITH_PLASMA=1
python setup.py clean
find pyarrow -name "*.so" -delete
diff --git a/python/benchmarks/plasma.py b/python/benchmarks/plasma.py
new file mode 100644
index 0000000..8a607a3
--- /dev/null
+++ b/python/benchmarks/plasma.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import timeit
+
+import pyarrow as pa
+import pyarrow.plasma as plasma
+
+from . import common
+
+
+class SimplePlasmaThroughput(object):
+ """Benchmark plasma store throughput with a single client."""
+
+ params = [1000, 100000, 10000000]
+
+ timer = timeit.default_timer
+
+ def setup(self, size):
+ self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9)
+ plasma_store_name, p = self.plasma_store_ctx.__enter__()
+ self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+
+ self.data = np.random.randn(size // 8)
+
+ def teardown(self, size):
+ self.plasma_store_ctx.__exit__(None, None, None)
+
+ def time_plasma_put_data(self, size):
+ self.plasma_client.put(self.data)
+
+
+class SimplePlasmaLatency(object):
+ """Benchmark plasma store latency with a single client."""
+
+ timer = timeit.default_timer
+
+ def setup(self):
+ self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9)
+ plasma_store_name, p = self.plasma_store_ctx.__enter__()
+ self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+
+ def teardown(self):
+ self.plasma_store_ctx.__exit__(None, None, None)
+
+ def time_plasma_put(self):
+ for i in range(1000):
+ self.plasma_client.put(1)
+
+ def time_plasma_putget(self):
+ for i in range(1000):
+ x = self.plasma_client.put(1)
+ self.plasma_client.get(x)
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/_plasma.pyx
similarity index 100%
rename from python/pyarrow/plasma.pyx
rename to python/pyarrow/_plasma.pyx
diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py
new file mode 100644
index 0000000..2fb3974
--- /dev/null
+++ b/python/pyarrow/plasma.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import pyarrow as pa
+import subprocess
+import shutil
+import tempfile
+import time
+
+from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa
+ PlasmaBuffer, PlasmaClient, connect)
+
+
+@contextlib.contextmanager
+def start_plasma_store(plasma_store_memory,
+ use_valgrind=False, use_profiler=False,
+ use_one_memory_mapped_file=False,
+ plasma_directory=None, use_hugepages=False):
+ """Start a plasma store process.
+ Args:
+ plasma_store_memory (int): Capacity of the plasma store in bytes.
+ use_valgrind (bool): True if the plasma store should be started inside
+ of valgrind. If this is True, use_profiler must be False.
+ use_profiler (bool): True if the plasma store should be started inside
+ a profiler. If this is True, use_valgrind must be False.
+ use_one_memory_mapped_file: If True, then the store will use only a
+ single memory-mapped file.
+ plasma_directory (str): Directory where plasma memory mapped files
+ will be stored.
+ use_hugepages (bool): True if the plasma store should use huge pages.
+ Return:
+ A tuple of the name of the plasma store socket and the process ID of
+ the plasma store process.
+ """
+ if use_valgrind and use_profiler:
+ raise Exception("Cannot use valgrind and profiler at the same time.")
+
+ tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
+ try:
+ plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
+ plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
+ command = [plasma_store_executable,
+ "-s", plasma_store_name,
+ "-m", str(plasma_store_memory)]
+ if use_one_memory_mapped_file:
+ command += ["-f"]
+ if plasma_directory:
+ command += ["-d", plasma_directory]
+ if use_hugepages:
+ command += ["-h"]
+ stdout_file = None
+ stderr_file = None
+ if use_valgrind:
+ command = ["valgrind",
+ "--track-origins=yes",
+ "--leak-check=full",
+ "--show-leak-kinds=all",
+ "--leak-check-heuristics=stdstring",
+ "--error-exitcode=1"] + command
+ proc = subprocess.Popen(command, stdout=stdout_file,
+ stderr=stderr_file)
+ time.sleep(1.0)
+ elif use_profiler:
+ command = ["valgrind", "--tool=callgrind"] + command
+ proc = subprocess.Popen(command, stdout=stdout_file,
+ stderr=stderr_file)
+ time.sleep(1.0)
+ else:
+ proc = subprocess.Popen(command, stdout=stdout_file,
+ stderr=stderr_file)
+ time.sleep(0.1)
+ rc = proc.poll()
+ if rc is not None:
+ raise RuntimeError("plasma_store exited unexpectedly with "
+ "code %d" % (rc,))
+
+ yield plasma_store_name, proc
+ finally:
+ if proc.poll() is None:
+ proc.kill()
+ shutil.rmtree(tmpdir)
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index de11eb6..fe5cafb 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,16 +19,11 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
-import contextlib
import os
import pytest
import random
-import shutil
import signal
import sys
-import subprocess
-import tempfile
-import time
import numpy as np
import pyarrow as pa
@@ -106,76 +101,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
assert plasma.buffers_equal(metadata, client1_metadata)
-@contextlib.contextmanager
-def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
- use_valgrind=False, use_profiler=False,
- use_one_memory_mapped_file=False,
- plasma_directory=None, use_hugepages=False):
- """Start a plasma store process.
- Args:
- use_valgrind (bool): True if the plasma store should be started inside
- of valgrind. If this is True, use_profiler must be False.
- use_profiler (bool): True if the plasma store should be started inside
- a profiler. If this is True, use_valgrind must be False.
- stdout_file: A file handle opened for writing to redirect stdout to. If
- no redirection should happen, then this should be None.
- stderr_file: A file handle opened for writing to redirect stderr to. If
- no redirection should happen, then this should be None.
- use_one_memory_mapped_file: If True, then the store will use only a
- single memory-mapped file.
- Return:
- A tuple of the name of the plasma store socket and the process ID of
- the plasma store process.
- """
- if use_valgrind and use_profiler:
- raise Exception("Cannot use valgrind and profiler at the same time.")
-
- tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
- try:
- plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
- plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
- command = [plasma_store_executable,
- "-s", plasma_store_name,
- "-m", str(plasma_store_memory)]
- if use_one_memory_mapped_file:
- command += ["-f"]
- if plasma_directory:
- command += ["-d", plasma_directory]
- if use_hugepages:
- command += ["-h"]
- stdout_file = None
- stderr_file = None
- if use_valgrind:
- command = ["valgrind",
- "--track-origins=yes",
- "--leak-check=full",
- "--show-leak-kinds=all",
- "--leak-check-heuristics=stdstring",
- "--error-exitcode=1"] + command
- proc = subprocess.Popen(command, stdout=stdout_file,
- stderr=stderr_file)
- time.sleep(1.0)
- elif use_profiler:
- command = ["valgrind", "--tool=callgrind"] + command
- proc = subprocess.Popen(command, stdout=stdout_file,
- stderr=stderr_file)
- time.sleep(1.0)
- else:
- proc = subprocess.Popen(command, stdout=stdout_file,
- stderr=stderr_file)
- time.sleep(0.1)
- rc = proc.poll()
- if rc is not None:
- raise RuntimeError("plasma_store exited unexpectedly with "
- "code %d" % (rc,))
-
- yield plasma_store_name, proc
- finally:
- if proc.poll() is None:
- proc.kill()
- shutil.rmtree(tmpdir)
-
-
@pytest.mark.plasma
class TestPlasmaClient(object):
@@ -185,7 +110,8 @@ class TestPlasmaClient(object):
import pyarrow.plasma as plasma
# Start Plasma store.
- self.plasma_store_ctx = start_plasma_store(
+ self.plasma_store_ctx = plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
use_valgrind=USE_VALGRIND,
use_one_memory_mapped_file=use_one_memory_mapped_file)
plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
@@ -839,8 +765,10 @@ def test_object_id_equality_operators():
reason="requires hugepage support")
def test_use_huge_pages():
import pyarrow.plasma as plasma
- with start_plasma_store(plasma_directory="/mnt/hugepages",
- use_hugepages=True) as (plasma_store_name, p):
+ with plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
+ plasma_directory="/mnt/hugepages",
+ use_hugepages=True) as (plasma_store_name, p):
plasma_client = plasma.connect(plasma_store_name, "", 64)
create_object(plasma_client, 100000000)
@@ -852,7 +780,9 @@ def test_use_huge_pages():
def test_plasma_client_sharing():
import pyarrow.plasma as plasma
- with start_plasma_store() as (plasma_store_name, p):
+ with plasma.start_plasma_store(
+ plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+ as (plasma_store_name, p):
plasma_client = plasma.connect(plasma_store_name, "", 64)
object_id = plasma_client.put(np.zeros(3))
buf = plasma_client.get(object_id)
diff --git a/python/setup.py b/python/setup.py
index 8d26e09..2e0d982 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -144,7 +144,7 @@ class build_ext(_build_ext):
'lib',
'_parquet',
'_orc',
- 'plasma']
+ '_plasma']
def _run_cmake(self):
# The directory containing this setup.py
@@ -328,7 +328,7 @@ class build_ext(_build_ext):
def _failure_permitted(self, name):
if name == '_parquet' and not self.with_parquet:
return True
- if name == 'plasma' and not self.with_plasma:
+ if name == '_plasma' and not self.with_plasma:
return True
if name == '_orc' and not self.with_orc:
return True
--
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.