You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/03/07 23:40:52 UTC

[arrow] branch master updated: ARROW-4797: [Plasma] Allow client to check store capacity and avoid server crash

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 7280916  ARROW-4797: [Plasma] Allow client to check store capacity and avoid server crash
7280916 is described below

commit 728091659eb19e6f11ea0f9d8653447d70846031
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Mar 7 17:40:45 2019 -0600

    ARROW-4797: [Plasma] Allow client to check store capacity and avoid server crash
    
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #3837 from pcmoritz/plasma-store-capacity and squashes the following commits:
    
    42780112 <Philipp Moritz> linting
    cab036d1 <Philipp Moritz> allow client to check store capacity
---
 cpp/src/plasma/client.cc            |  4 ++++
 cpp/src/plasma/client.h             |  5 +++++
 cpp/src/plasma/store.cc             |  5 ++++-
 python/pyarrow/_plasma.pyx          | 14 ++++++++++++++
 python/pyarrow/tests/test_plasma.py |  8 ++++++++
 5 files changed, 35 insertions(+), 1 deletion(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index f08d6ef..80cf718 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -200,6 +200,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
 
   bool IsInUse(const ObjectID& object_id);
 
+  int64_t store_capacity() { return store_capacity_; }
+
  private:
   /// Check if store_fd has already been received from the store. If yes,
   /// return it. Otherwise, receive it from the store (see analogous logic
@@ -968,4 +970,6 @@ bool PlasmaClient::IsInUse(const ObjectID& object_id) {
   return impl_->IsInUse(object_id);
 }
 
+int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); }
+
 }  // namespace plasma
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index ac9e8eb..8d1d6a7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -250,6 +250,11 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Disconnect();
 
+  /// Get the memory capacity of the store.
+  ///
+  /// \return Memory capacity of the store in bytes.
+  int64_t store_capacity();
+
  private:
   friend class PlasmaBuffer;
   FRIEND_TEST(TestPlasmaStore, GetTest);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c55da30..a3f08e7 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -1131,14 +1131,17 @@ int main(int argc, char* argv[]) {
     // shm_vfs_stats.f_bavail is the number of available blocks.
     int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
     close(shm_fd);
+    // Keep some safety margin for allocator fragmentation.
+    shm_mem_avail = 9 * shm_mem_avail / 10;
     if (system_memory > shm_mem_avail) {
-      ARROW_LOG(FATAL)
+      ARROW_LOG(WARNING)
           << "System memory request exceeds memory available in " << plasma_directory
           << ". The request is for " << system_memory
           << " bytes, and the amount available is " << shm_mem_avail
           << " bytes. You may be able to free up space by deleting files in "
              "/dev/shm. If you are inside a Docker container, you may need to "
              "pass an argument with the flag '--shm-size' to 'docker run'.";
+      system_memory = shm_mem_avail;
     }
   } else {
     SetMallocGranularity(1024 * 1024 * 1024);  // 1 GB
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index b3868dc..04963b6 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -135,6 +135,8 @@ cdef extern from "plasma/client.h" nogil:
 
         CStatus Delete(const c_vector[CUniqueID] object_ids)
 
+        int64_t store_capacity()
+
 cdef extern from "plasma/client.h" nogil:
 
     cdef struct CObjectBuffer" plasma::ObjectBuffer":
@@ -758,6 +760,18 @@ cdef class PlasmaClient:
             inc(it)
         return result
 
+    def store_capacity(self):
+        """
+        Get the memory capacity of the store.
+
+        Returns
+        -------
+
+        int
+            The memory capacity of the store in bytes.
+        """
+        return self.client.get().store_capacity()
+
 
 def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
             int num_retries=-1):
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index ef53bab..c8a4d33 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -1083,3 +1083,11 @@ def test_object_id_randomness():
     first_object_id = subprocess.check_output(["python", "-c", cmd])
     second_object_id = subprocess.check_output(["python", "-c", cmd])
     assert first_object_id != second_object_id
+
+
+@pytest.mark.plasma
+def test_store_capacity():
+    import pyarrow.plasma as plasma
+    with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p):
+        plasma_client = plasma.connect(name)
+        assert plasma_client.store_capacity() == 10000