You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/03/07 23:40:52 UTC
[arrow] branch master updated: ARROW-4797: [Plasma] Allow client to
check store capacity and avoid server crash
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 7280916 ARROW-4797: [Plasma] Allow client to check store capacity and avoid server crash
7280916 is described below
commit 728091659eb19e6f11ea0f9d8653447d70846031
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Mar 7 17:40:45 2019 -0600
ARROW-4797: [Plasma] Allow client to check store capacity and avoid server crash
Author: Philipp Moritz <pc...@gmail.com>
Closes #3837 from pcmoritz/plasma-store-capacity and squashes the following commits:
42780112 <Philipp Moritz> linting
cab036d1 <Philipp Moritz> allow client to check store capacity
---
cpp/src/plasma/client.cc | 4 ++++
cpp/src/plasma/client.h | 5 +++++
cpp/src/plasma/store.cc | 5 ++++-
python/pyarrow/_plasma.pyx | 14 ++++++++++++++
python/pyarrow/tests/test_plasma.py | 8 ++++++++
5 files changed, 35 insertions(+), 1 deletion(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index f08d6ef..80cf718 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -200,6 +200,8 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
bool IsInUse(const ObjectID& object_id);
+ int64_t store_capacity() { return store_capacity_; }
+
private:
/// Check if store_fd has already been received from the store. If yes,
/// return it. Otherwise, receive it from the store (see analogous logic
@@ -968,4 +970,6 @@ bool PlasmaClient::IsInUse(const ObjectID& object_id) {
return impl_->IsInUse(object_id);
}
+int64_t PlasmaClient::store_capacity() { return impl_->store_capacity(); }
+
} // namespace plasma
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index ac9e8eb..8d1d6a7 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -250,6 +250,11 @@ class ARROW_EXPORT PlasmaClient {
/// \return The return status.
Status Disconnect();
+ /// Get the memory capacity of the store.
+ ///
+ /// \return Memory capacity of the store in bytes.
+ int64_t store_capacity();
+
private:
friend class PlasmaBuffer;
FRIEND_TEST(TestPlasmaStore, GetTest);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c55da30..a3f08e7 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -1131,14 +1131,17 @@ int main(int argc, char* argv[]) {
// shm_vfs_stats.f_bavail is the number of available blocks.
int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
close(shm_fd);
+ // Keep some safety margin for allocator fragmentation.
+ shm_mem_avail = 9 * shm_mem_avail / 10;
if (system_memory > shm_mem_avail) {
- ARROW_LOG(FATAL)
+ ARROW_LOG(WARNING)
<< "System memory request exceeds memory available in " << plasma_directory
<< ". The request is for " << system_memory
<< " bytes, and the amount available is " << shm_mem_avail
<< " bytes. You may be able to free up space by deleting files in "
"/dev/shm. If you are inside a Docker container, you may need to "
"pass an argument with the flag '--shm-size' to 'docker run'.";
+ system_memory = shm_mem_avail;
}
} else {
SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index b3868dc..04963b6 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -135,6 +135,8 @@ cdef extern from "plasma/client.h" nogil:
CStatus Delete(const c_vector[CUniqueID] object_ids)
+ int64_t store_capacity()
+
cdef extern from "plasma/client.h" nogil:
cdef struct CObjectBuffer" plasma::ObjectBuffer":
@@ -758,6 +760,18 @@ cdef class PlasmaClient:
inc(it)
return result
+ def store_capacity(self):
+ """
+ Get the memory capacity of the store.
+
+ Returns
+ -------
+
+ int
+ The memory capacity of the store in bytes.
+ """
+ return self.client.get().store_capacity()
+
def connect(store_socket_name, manager_socket_name=None, int release_delay=0,
int num_retries=-1):
diff --git a/python/pyarrow/tests/test_plasma.py b/python/pyarrow/tests/test_plasma.py
index ef53bab..c8a4d33 100644
--- a/python/pyarrow/tests/test_plasma.py
+++ b/python/pyarrow/tests/test_plasma.py
@@ -1083,3 +1083,11 @@ def test_object_id_randomness():
first_object_id = subprocess.check_output(["python", "-c", cmd])
second_object_id = subprocess.check_output(["python", "-c", cmd])
assert first_object_id != second_object_id
+
+
+@pytest.mark.plasma
+def test_store_capacity():
+ import pyarrow.plasma as plasma
+ with plasma.start_plasma_store(plasma_store_memory=10000) as (name, p):
+ plasma_client = plasma.connect(name)
+ assert plasma_client.store_capacity() == 10000