You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/05/14 23:57:08 UTC

[arrow] branch master updated: ARROW-2577: [Plasma] Add asv benchmarks for plasma

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 75acaba  ARROW-2577: [Plasma] Add asv benchmarks for plasma
75acaba is described below

commit 75acaba67d7e2ead5ccc51e42396e916b44db3f9
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Mon May 14 16:56:58 2018 -0700

    ARROW-2577: [Plasma] Add asv benchmarks for plasma
    
    This adds some initial ASV benchmarks for plasma:
    
    - Put latency
    - Get latency
    - Put throughput for 1KB, 10KB, 100KB, 1MB, 10MB, 100MB
    
    It also includes some minor code restructuring to expose the start_plasma_store method.
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #2038 from pcmoritz/plasma-asv and squashes the following commits:
    
    34a06845 <Philipp Moritz> measure wallclock time instead of process cpu time
    c89256f7 <Philipp Moritz> parametrize tests
    3567ddc7 <Philipp Moritz> fix windows build
    eca17675 <Philipp Moritz> build plasma in asv
    47671b34 <Philipp Moritz> fix
    1261177e <Philipp Moritz> fix linting errors
    7d4d6854 <Philipp Moritz> Add asv benchmarks for plasma
---
 python/CMakeLists.txt                      |  2 +-
 python/asv-build.sh                        |  2 +
 python/benchmarks/plasma.py                | 68 +++++++++++++++++++++
 python/pyarrow/{plasma.pyx => _plasma.pyx} |  0
 python/pyarrow/plasma.py                   | 97 ++++++++++++++++++++++++++++++
 python/pyarrow/tests/test_plasma.py        | 88 +++------------------------
 python/setup.py                            |  4 +-
 7 files changed, 179 insertions(+), 82 deletions(-)

diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt
index fcc1d3c..99194c2 100644
--- a/python/CMakeLists.txt
+++ b/python/CMakeLists.txt
@@ -404,7 +404,7 @@ if (PYARROW_BUILD_PLASMA)
     ${LINK_LIBS}
     libplasma_shared)
 
-  set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} plasma)
+  set(CYTHON_EXTENSIONS ${CYTHON_EXTENSIONS} _plasma)
   file(COPY ${PLASMA_EXECUTABLE} DESTINATION ${BUILD_OUTPUT_ROOT_DIRECTORY})
 endif()
 
diff --git a/python/asv-build.sh b/python/asv-build.sh
index 31e56ed..2bbc94b 100755
--- a/python/asv-build.sh
+++ b/python/asv-build.sh
@@ -37,6 +37,7 @@ cmake -GNinja \
       -DCMAKE_INSTALL_PREFIX=$ARROW_HOME \
       -DARROW_CXXFLAGS=$CXXFLAGS \
       -DARROW_PYTHON=ON \
+      -DARROW_PLASMA=ON \
       -DARROW_BUILD_TESTS=OFF \
       ..
 cmake --build . --target install
@@ -47,6 +48,7 @@ popd
 # Build pyarrow wrappers
 export SETUPTOOLS_SCM_PRETEND_VERSION=0.0.1
 export PYARROW_BUILD_TYPE=release
+export PYARROW_WITH_PLASMA=1
 
 python setup.py clean
 find pyarrow -name "*.so" -delete
diff --git a/python/benchmarks/plasma.py b/python/benchmarks/plasma.py
new file mode 100644
index 0000000..8a607a3
--- /dev/null
+++ b/python/benchmarks/plasma.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import numpy as np
+import timeit
+
+import pyarrow as pa
+import pyarrow.plasma as plasma
+
+from . import common
+
+
+class SimplePlasmaThroughput(object):
+    """Benchmark plasma store throughput with a single client."""
+
+    params = [1000, 100000, 10000000]
+
+    timer = timeit.default_timer
+
+    def setup(self, size):
+        self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9)
+        plasma_store_name, p = self.plasma_store_ctx.__enter__()
+        self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+
+        self.data = np.random.randn(size // 8)
+
+    def teardown(self, size):
+        self.plasma_store_ctx.__exit__(None, None, None)
+
+    def time_plasma_put_data(self, size):
+        self.plasma_client.put(self.data)
+
+
+class SimplePlasmaLatency(object):
+    """Benchmark plasma store latency with a single client."""
+
+    timer = timeit.default_timer
+
+    def setup(self):
+        self.plasma_store_ctx = plasma.start_plasma_store(plasma_store_memory=10**9)
+        plasma_store_name, p = self.plasma_store_ctx.__enter__()
+        self.plasma_client = plasma.connect(plasma_store_name, "", 64)
+
+    def teardown(self):
+        self.plasma_store_ctx.__exit__(None, None, None)
+
+    def time_plasma_put(self):
+        for i in range(1000):
+            self.plasma_client.put(1)
+
+    def time_plasma_putget(self):
+        for i in range(1000):
+            x = self.plasma_client.put(1)
+            self.plasma_client.get(x)
diff --git a/python/pyarrow/plasma.pyx b/python/pyarrow/_plasma.pyx
similarity index 100%
rename from python/pyarrow/plasma.pyx
rename to python/pyarrow/_plasma.pyx
diff --git a/python/pyarrow/plasma.py b/python/pyarrow/plasma.py
new file mode 100644
index 0000000..2fb3974
--- /dev/null
+++ b/python/pyarrow/plasma.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import contextlib
+import os
+import pyarrow as pa
+import subprocess
+import shutil
+import tempfile
+import time
+
+from pyarrow._plasma import (ObjectID, ObjectNotAvailable, # noqa
+                             PlasmaBuffer, PlasmaClient, connect)
+
+
+@contextlib.contextmanager
+def start_plasma_store(plasma_store_memory,
+                       use_valgrind=False, use_profiler=False,
+                       use_one_memory_mapped_file=False,
+                       plasma_directory=None, use_hugepages=False):
+    """Start a plasma store process.
+    Args:
+        plasma_store_memory (int): Capacity of the plasma store in bytes.
+        use_valgrind (bool): True if the plasma store should be started inside
+            of valgrind. If this is True, use_profiler must be False.
+        use_profiler (bool): True if the plasma store should be started inside
+            a profiler. If this is True, use_valgrind must be False.
+        use_one_memory_mapped_file: If True, then the store will use only a
+            single memory-mapped file.
+        plasma_directory (str): Directory where plasma memory mapped files
+            will be stored.
+        use_hugepages (bool): True if the plasma store should use huge pages.
+    Return:
+        A tuple of the name of the plasma store socket and the process ID of
+            the plasma store process.
+    """
+    if use_valgrind and use_profiler:
+        raise Exception("Cannot use valgrind and profiler at the same time.")
+
+    tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
+    try:
+        plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
+        plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
+        command = [plasma_store_executable,
+                   "-s", plasma_store_name,
+                   "-m", str(plasma_store_memory)]
+        if use_one_memory_mapped_file:
+            command += ["-f"]
+        if plasma_directory:
+            command += ["-d", plasma_directory]
+        if use_hugepages:
+            command += ["-h"]
+        stdout_file = None
+        stderr_file = None
+        if use_valgrind:
+            command = ["valgrind",
+                       "--track-origins=yes",
+                       "--leak-check=full",
+                       "--show-leak-kinds=all",
+                       "--leak-check-heuristics=stdstring",
+                       "--error-exitcode=1"] + command
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(1.0)
+        elif use_profiler:
+            command = ["valgrind", "--tool=callgrind"] + command
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(1.0)
+        else:
+            proc = subprocess.Popen(command, stdout=stdout_file,
+                                    stderr=stderr_file)
+            time.sleep(0.1)
+        rc = proc.poll()
+        if rc is not None:
+            raise RuntimeError("plasma_store exited unexpectedly with "
+                               "code %d" % (rc,))
+
+        yield plasma_store_name, proc
+    finally:
+        if proc.poll() is None:
+            proc.kill()
+        shutil.rmtree(tmpdir)
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index de11eb6..fe5cafb 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -19,16 +19,11 @@ from __future__ import absolute_import
 from __future__ import division
 from __future__ import print_function
 
-import contextlib
 import os
 import pytest
 import random
-import shutil
 import signal
 import sys
-import subprocess
-import tempfile
-import time
 
 import numpy as np
 import pyarrow as pa
@@ -106,76 +101,6 @@ def assert_get_object_equal(unit_test, client1, client2, object_id,
         assert plasma.buffers_equal(metadata, client1_metadata)
 
 
-@contextlib.contextmanager
-def start_plasma_store(plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
-                       use_valgrind=False, use_profiler=False,
-                       use_one_memory_mapped_file=False,
-                       plasma_directory=None, use_hugepages=False):
-    """Start a plasma store process.
-    Args:
-        use_valgrind (bool): True if the plasma store should be started inside
-            of valgrind. If this is True, use_profiler must be False.
-        use_profiler (bool): True if the plasma store should be started inside
-            a profiler. If this is True, use_valgrind must be False.
-        stdout_file: A file handle opened for writing to redirect stdout to. If
-            no redirection should happen, then this should be None.
-        stderr_file: A file handle opened for writing to redirect stderr to. If
-            no redirection should happen, then this should be None.
-        use_one_memory_mapped_file: If True, then the store will use only a
-            single memory-mapped file.
-    Return:
-        A tuple of the name of the plasma store socket and the process ID of
-            the plasma store process.
-    """
-    if use_valgrind and use_profiler:
-        raise Exception("Cannot use valgrind and profiler at the same time.")
-
-    tmpdir = tempfile.mkdtemp(prefix='test_plasma-')
-    try:
-        plasma_store_name = os.path.join(tmpdir, 'plasma.sock')
-        plasma_store_executable = os.path.join(pa.__path__[0], "plasma_store")
-        command = [plasma_store_executable,
-                   "-s", plasma_store_name,
-                   "-m", str(plasma_store_memory)]
-        if use_one_memory_mapped_file:
-            command += ["-f"]
-        if plasma_directory:
-            command += ["-d", plasma_directory]
-        if use_hugepages:
-            command += ["-h"]
-        stdout_file = None
-        stderr_file = None
-        if use_valgrind:
-            command = ["valgrind",
-                       "--track-origins=yes",
-                       "--leak-check=full",
-                       "--show-leak-kinds=all",
-                       "--leak-check-heuristics=stdstring",
-                       "--error-exitcode=1"] + command
-            proc = subprocess.Popen(command, stdout=stdout_file,
-                                    stderr=stderr_file)
-            time.sleep(1.0)
-        elif use_profiler:
-            command = ["valgrind", "--tool=callgrind"] + command
-            proc = subprocess.Popen(command, stdout=stdout_file,
-                                    stderr=stderr_file)
-            time.sleep(1.0)
-        else:
-            proc = subprocess.Popen(command, stdout=stdout_file,
-                                    stderr=stderr_file)
-            time.sleep(0.1)
-        rc = proc.poll()
-        if rc is not None:
-            raise RuntimeError("plasma_store exited unexpectedly with "
-                               "code %d" % (rc,))
-
-        yield plasma_store_name, proc
-    finally:
-        if proc.poll() is None:
-            proc.kill()
-        shutil.rmtree(tmpdir)
-
-
 @pytest.mark.plasma
 class TestPlasmaClient(object):
 
@@ -185,7 +110,8 @@ class TestPlasmaClient(object):
 
         import pyarrow.plasma as plasma
         # Start Plasma store.
-        self.plasma_store_ctx = start_plasma_store(
+        self.plasma_store_ctx = plasma.start_plasma_store(
+            plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
             use_valgrind=USE_VALGRIND,
             use_one_memory_mapped_file=use_one_memory_mapped_file)
         plasma_store_name, self.p = self.plasma_store_ctx.__enter__()
@@ -839,8 +765,10 @@ def test_object_id_equality_operators():
                     reason="requires hugepage support")
 def test_use_huge_pages():
     import pyarrow.plasma as plasma
-    with start_plasma_store(plasma_directory="/mnt/hugepages",
-                            use_hugepages=True) as (plasma_store_name, p):
+    with plasma.start_plasma_store(
+            plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY,
+            plasma_directory="/mnt/hugepages",
+            use_hugepages=True) as (plasma_store_name, p):
         plasma_client = plasma.connect(plasma_store_name, "", 64)
         create_object(plasma_client, 100000000)
 
@@ -852,7 +780,9 @@ def test_use_huge_pages():
 def test_plasma_client_sharing():
     import pyarrow.plasma as plasma
 
-    with start_plasma_store() as (plasma_store_name, p):
+    with plasma.start_plasma_store(
+            plasma_store_memory=DEFAULT_PLASMA_STORE_MEMORY) \
+            as (plasma_store_name, p):
         plasma_client = plasma.connect(plasma_store_name, "", 64)
         object_id = plasma_client.put(np.zeros(3))
         buf = plasma_client.get(object_id)
diff --git a/python/setup.py b/python/setup.py
index 8d26e09..2e0d982 100644
--- a/python/setup.py
+++ b/python/setup.py
@@ -144,7 +144,7 @@ class build_ext(_build_ext):
         'lib',
         '_parquet',
         '_orc',
-        'plasma']
+        '_plasma']
 
     def _run_cmake(self):
         # The directory containing this setup.py
@@ -328,7 +328,7 @@ class build_ext(_build_ext):
     def _failure_permitted(self, name):
         if name == '_parquet' and not self.with_parquet:
             return True
-        if name == 'plasma' and not self.with_plasma:
+        if name == '_plasma' and not self.with_plasma:
             return True
         if name == '_orc' and not self.with_orc:
             return True

-- 
To stop receiving notification emails like this one, please contact
pcmoritz@apache.org.