You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2018/07/17 22:44:26 UTC
[arrow] branch master updated: ARROW-2690: [Plasma] Use uniform
function names in public APIs in Plasma. Add namespace around Flatbuffers
This is an automated email from the ASF dual-hosted git repository.
pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5063b33 ARROW-2690: [Plasma] Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers
5063b33 is described below
commit 5063b333cf130d51a79f229d6b987c192773ab07
Author: Wes McKinney <we...@apache.org>
AuthorDate: Tue Jul 17 15:44:18 2018 -0700
ARROW-2690: [Plasma] Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers
I made a pass over Plasma to make the function names more uniform in the style; let me know your comments. It was a little bit painful.
I also wanted to be more explicit about what parts of the Flatbuffers protocol are leaking into the public API. Still `flatbuffers/flatbuffers.h` is leaking, but this is a step in containing things a bit.
Author: Wes McKinney <we...@apache.org>
Closes #2242 from wesm/ARROW-2690 and squashes the following commits:
6d694ca0 <Wes McKinney> Rename some more internal functions
fc66f533 <Wes McKinney> Use more distinguishable namespace alias than flatbuf
1bb90c32 <Wes McKinney> Use uniform function names in public APIs in Plasma. Add namespace around Flatbuffers
---
cpp/src/plasma/client.cc | 67 ++++----
cpp/src/plasma/common.cc | 16 +-
cpp/src/plasma/common.h | 14 +-
cpp/src/plasma/eviction_policy.cc | 41 +++--
cpp/src/plasma/eviction_policy.h | 26 ++--
cpp/src/plasma/format/common.fbs | 2 +
cpp/src/plasma/format/plasma.fbs | 1 +
cpp/src/plasma/io.cc | 12 +-
cpp/src/plasma/io.h | 16 +-
cpp/src/plasma/malloc.cc | 6 +-
cpp/src/plasma/malloc.h | 6 +-
cpp/src/plasma/plasma.cc | 12 +-
cpp/src/plasma/plasma.h | 16 +-
cpp/src/plasma/protocol.cc | 220 +++++++++++++-------------
cpp/src/plasma/protocol.h | 5 +-
cpp/src/plasma/store.cc | 240 +++++++++++++++--------------
cpp/src/plasma/store.h | 49 +++---
cpp/src/plasma/test/serialization_tests.cc | 12 +-
18 files changed, 403 insertions(+), 358 deletions(-)
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index 95da089..f2b0b97 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -66,8 +66,13 @@ using arrow::gpu::CudaDeviceManager;
#define XXH64_DEFAULT_SEED 0
+namespace fb = plasma::flatbuf;
+
namespace plasma {
+using fb::MessageType;
+using fb::PlasmaError;
+
using arrow::MutableBuffer;
typedef struct XXH64_state_s XXH64_state_t;
@@ -225,17 +230,17 @@ class PlasmaClient::Impl : public std::enable_shared_from_this<PlasmaClient::Imp
const ObjectID&, const std::shared_ptr<Buffer>&)>& wrap_buffer,
ObjectBuffer* object_buffers);
- uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
+ uint8_t* LookupOrMmap(int fd, int store_fd_val, int64_t map_size);
- uint8_t* lookup_mmapped_file(int store_fd_val);
+ uint8_t* LookupMmappedFile(int store_fd_val);
- void increment_object_count(const ObjectID& object_id, PlasmaObject* object,
- bool is_sealed);
+ void IncrementObjectCount(const ObjectID& object_id, PlasmaObject* object,
+ bool is_sealed);
- bool compute_object_hash_parallel(XXH64_state_t* hash_state, const unsigned char* data,
- int64_t nbytes);
+ bool ComputeObjectHashParallel(XXH64_state_t* hash_state, const unsigned char* data,
+ int64_t nbytes);
- uint64_t compute_object_hash(const ObjectBuffer& obj_buffer);
+ uint64_t ComputeObjectHash(const ObjectBuffer& obj_buffer);
/// File descriptor of the Unix domain socket that connects to the store.
int store_conn_;
@@ -284,7 +289,7 @@ PlasmaClient::Impl::~Impl() {}
// If the file descriptor fd has been mmapped in this client process before,
// return the pointer that was returned by mmap, otherwise mmap it and store the
// pointer in a hash table.
-uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t map_size) {
+uint8_t* PlasmaClient::Impl::LookupOrMmap(int fd, int store_fd_val, int64_t map_size) {
auto entry = mmap_table_.find(store_fd_val);
if (entry != mmap_table_.end()) {
close(fd);
@@ -310,7 +315,7 @@ uint8_t* PlasmaClient::Impl::lookup_or_mmap(int fd, int store_fd_val, int64_t ma
// Get a pointer to a file that we know has been memory mapped in this client
// process before.
-uint8_t* PlasmaClient::Impl::lookup_mmapped_file(int store_fd_val) {
+uint8_t* PlasmaClient::Impl::LookupMmappedFile(int store_fd_val) {
auto entry = mmap_table_.find(store_fd_val);
ARROW_CHECK(entry != mmap_table_.end());
return entry->second.pointer;
@@ -321,8 +326,8 @@ bool PlasmaClient::Impl::IsInUse(const ObjectID& object_id) {
return (elem != objects_in_use_.end());
}
-void PlasmaClient::Impl::increment_object_count(const ObjectID& object_id,
- PlasmaObject* object, bool is_sealed) {
+void PlasmaClient::Impl::IncrementObjectCount(const ObjectID& object_id,
+ PlasmaObject* object, bool is_sealed) {
// Increment the count of the object to track the fact that it is being used.
// The corresponding decrement should happen in PlasmaClient::Release.
auto elem = objects_in_use_.find(object_id);
@@ -383,7 +388,7 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// The metadata should come right after the data.
ARROW_CHECK(object.metadata_offset == object.data_offset + data_size);
*data = std::make_shared<MutableBuffer>(
- lookup_or_mmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
+ LookupOrMmap(fd, store_fd, mmap_size) + object.data_offset, data_size);
// If plasma_create is being called from a transfer, then we will not copy the
// metadata here. The metadata will be written along with the data streamed
// from the transfer.
@@ -414,13 +419,13 @@ Status PlasmaClient::Impl::Create(const ObjectID& object_id, int64_t data_size,
// client is using. A call to PlasmaClient::Release is required to decrement
// this
// count. Cache the reference to the object.
- increment_object_count(object_id, &object, false);
+ IncrementObjectCount(object_id, &object, false);
// We increment the count a second time (and the corresponding decrement will
// happen in a PlasmaClient::Release call in plasma_seal) so even if the
// buffer
// returned by PlasmaClient::Dreate goes out of scope, the object does not get
// released before the call to PlasmaClient::Seal happens.
- increment_object_count(object_id, &object, false);
+ IncrementObjectCount(object_id, &object, false);
return Status::OK();
}
@@ -446,7 +451,7 @@ Status PlasmaClient::Impl::GetBuffers(
std::shared_ptr<Buffer> physical_buf;
if (object->device_num == 0) {
- uint8_t* data = lookup_mmapped_file(object->store_fd);
+ uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
@@ -463,7 +468,7 @@ Status PlasmaClient::Impl::GetBuffers(
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
// client is using. Cache the reference to the object.
- increment_object_count(object_ids[i], object, true);
+ IncrementObjectCount(object_ids[i], object, true);
}
}
@@ -490,7 +495,7 @@ Status PlasmaClient::Impl::GetBuffers(
for (size_t i = 0; i < store_fds.size(); i++) {
int fd = recv_fd(store_conn_);
ARROW_CHECK(fd >= 0);
- lookup_or_mmap(fd, store_fds[i], mmap_sizes[i]);
+ LookupOrMmap(fd, store_fds[i], mmap_sizes[i]);
}
for (int64_t i = 0; i < num_objects; ++i) {
@@ -509,7 +514,7 @@ Status PlasmaClient::Impl::GetBuffers(
if (object->data_size != -1) {
std::shared_ptr<Buffer> physical_buf;
if (object->device_num == 0) {
- uint8_t* data = lookup_mmapped_file(object->store_fd);
+ uint8_t* data = LookupMmappedFile(object->store_fd);
physical_buf = std::make_shared<Buffer>(
data + object->data_offset, object->data_size + object->metadata_size);
} else {
@@ -539,7 +544,7 @@ Status PlasmaClient::Impl::GetBuffers(
object_buffers[i].device_num = object->device_num;
// Increment the count of the number of instances of this object that this
// client is using. Cache the reference to the object.
- increment_object_count(received_object_ids[i], object, true);
+ IncrementObjectCount(received_object_ids[i], object, true);
} else {
// The object was not retrieved. The caller can detect this condition
// by checking the boolean value of the metadata/data buffers.
@@ -693,9 +698,9 @@ static void ComputeBlockHash(const unsigned char* data, int64_t nbytes, uint64_t
*hash = XXH64_digest(&hash_state);
}
-bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
- const unsigned char* data,
- int64_t nbytes) {
+bool PlasmaClient::Impl::ComputeObjectHashParallel(XXH64_state_t* hash_state,
+ const unsigned char* data,
+ int64_t nbytes) {
// Note that this function will likely be faster if the address of data is
// aligned on a 64-byte boundary.
auto pool = arrow::internal::GetCpuThreadPool();
@@ -729,7 +734,7 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
return true;
}
-uint64_t PlasmaClient::Impl::compute_object_hash(const ObjectBuffer& obj_buffer) {
+uint64_t PlasmaClient::Impl::ComputeObjectHash(const ObjectBuffer& obj_buffer) {
DCHECK(obj_buffer.metadata);
DCHECK(obj_buffer.data);
XXH64_state_t hash_state;
@@ -739,7 +744,7 @@ uint64_t PlasmaClient::Impl::compute_object_hash(const ObjectBuffer& obj_buffer)
}
XXH64_reset(&hash_state, XXH64_DEFAULT_SEED);
if (obj_buffer.data->size() >= kBytesInMB) {
- compute_object_hash_parallel(
+ ComputeObjectHashParallel(
&hash_state, reinterpret_cast<const unsigned char*>(obj_buffer.data->data()),
obj_buffer.data->size());
} else {
@@ -850,7 +855,7 @@ Status PlasmaClient::Impl::Hash(const ObjectID& object_id, uint8_t* digest) {
return Status::PlasmaObjectNonexistent("Object not found");
}
// Compute the hash.
- uint64_t hash = compute_object_hash(object_buffers[0]);
+ uint64_t hash = ComputeObjectHash(object_buffers[0]);
memcpy(digest, &hash, sizeof(hash));
return Status::OK();
}
@@ -877,11 +882,11 @@ Status PlasmaClient::Impl::Subscribe(int* fd) {
Status PlasmaClient::Impl::GetNotification(int fd, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size) {
- auto notification = read_message_async(fd);
+ auto notification = ReadMessageAsync(fd);
if (notification == NULL) {
return Status::IOError("Failed to read object notification from Plasma socket");
}
- auto object_info = flatbuffers::GetRoot<ObjectInfo>(notification.get());
+ auto object_info = flatbuffers::GetRoot<fb::ObjectInfo>(notification.get());
ARROW_CHECK(object_info->object_id()->size() == sizeof(ObjectID));
memcpy(object_id, object_info->object_id()->data(), sizeof(ObjectID));
if (object_info->is_deletion()) {
@@ -977,18 +982,18 @@ Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
ObjectRequestType type = object_requests[i].type;
- ObjectStatus status = object_requests[i].status;
+ fb::ObjectStatus status = object_requests[i].status;
switch (type) {
case ObjectRequestType::PLASMA_QUERY_LOCAL:
- if (status == ObjectStatus::Local) {
+ if (status == fb::ObjectStatus::Local) {
*num_objects_ready += 1;
}
break;
case ObjectRequestType::PLASMA_QUERY_ANYWHERE:
- if (status == ObjectStatus::Local || status == ObjectStatus::Remote) {
+ if (status == fb::ObjectStatus::Local || status == fb::ObjectStatus::Remote) {
*num_objects_ready += 1;
} else {
- ARROW_CHECK(status == ObjectStatus::Nonexistent);
+ ARROW_CHECK(status == fb::ObjectStatus::Nonexistent);
}
break;
default:
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index ae55fb9..f91b963 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -23,6 +23,8 @@
#include "plasma/plasma_generated.h"
+namespace fb = plasma::flatbuf;
+
namespace plasma {
using arrow::Status;
@@ -123,15 +125,15 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
-Status plasma_error_status(PlasmaError plasma_error) {
+Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
switch (plasma_error) {
- case PlasmaError::OK:
+ case fb::PlasmaError::OK:
return Status::OK();
- case PlasmaError::ObjectExists:
+ case fb::PlasmaError::ObjectExists:
return Status::PlasmaObjectExists("object already exists in the plasma store");
- case PlasmaError::ObjectNonexistent:
+ case fb::PlasmaError::ObjectNonexistent:
return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
- case PlasmaError::OutOfMemory:
+ case fb::PlasmaError::OutOfMemory:
return Status::PlasmaStoreFull("object does not fit in the plasma store");
default:
ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
@@ -139,8 +141,8 @@ Status plasma_error_status(PlasmaError plasma_error) {
return Status::OK();
}
-ARROW_EXPORT ObjectStatus ObjectStatusLocal = ObjectStatus::Local;
-ARROW_EXPORT ObjectStatus ObjectStatusRemote = ObjectStatus::Remote;
+ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
+ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;
const PlasmaStoreInfo* plasma_config;
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 90cd6a0..5e7c7d4 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -31,11 +31,15 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"
+namespace plasma {
+
+namespace flatbuf {
+
// Forward declaration outside the namespace, which is defined in plasma_generated.h.
enum class PlasmaError : int32_t;
enum class ObjectStatus : int32_t;
-namespace plasma {
+} // namespace flatbuf
constexpr int64_t kUniqueIDSize = 20;
@@ -58,7 +62,7 @@ static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
typedef UniqueID ObjectID;
-arrow::Status plasma_error_status(PlasmaError plasma_error);
+arrow::Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error);
/// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t);
@@ -87,11 +91,11 @@ struct ObjectRequest {
/// - ObjectStatus::Nonexistent: object does not exist in the system.
/// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
/// for being transferred or it is transferring.
- ObjectStatus status;
+ flatbuf::ObjectStatus status;
};
-extern ObjectStatus ObjectStatusLocal;
-extern ObjectStatus ObjectStatusRemote;
+extern flatbuf::ObjectStatus ObjectStatusLocal;
+extern flatbuf::ObjectStatus ObjectStatusRemote;
/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
diff --git a/cpp/src/plasma/eviction_policy.cc b/cpp/src/plasma/eviction_policy.cc
index 66a3b2e..ebe4e1a 100644
--- a/cpp/src/plasma/eviction_policy.cc
+++ b/cpp/src/plasma/eviction_policy.cc
@@ -21,7 +21,7 @@
namespace plasma {
-void LRUCache::add(const ObjectID& key, int64_t size) {
+void LRUCache::Add(const ObjectID& key, int64_t size) {
auto it = item_map_.find(key);
ARROW_CHECK(it == item_map_.end());
/* Note that it is important to use a list so the iterators stay valid. */
@@ -29,15 +29,15 @@ void LRUCache::add(const ObjectID& key, int64_t size) {
item_map_.emplace(key, item_list_.begin());
}
-void LRUCache::remove(const ObjectID& key) {
+void LRUCache::Remove(const ObjectID& key) {
auto it = item_map_.find(key);
ARROW_CHECK(it != item_map_.end());
item_list_.erase(it->second);
item_map_.erase(it);
}
-int64_t LRUCache::choose_objects_to_evict(int64_t num_bytes_required,
- std::vector<ObjectID>* objects_to_evict) {
+int64_t LRUCache::ChooseObjectsToEvict(int64_t num_bytes_required,
+ std::vector<ObjectID>* objects_to_evict) {
int64_t bytes_evicted = 0;
auto it = item_list_.end();
while (bytes_evicted < num_bytes_required && it != item_list_.begin()) {
@@ -51,13 +51,13 @@ int64_t LRUCache::choose_objects_to_evict(int64_t num_bytes_required,
EvictionPolicy::EvictionPolicy(PlasmaStoreInfo* store_info)
: memory_used_(0), store_info_(store_info) {}
-int64_t EvictionPolicy::choose_objects_to_evict(int64_t num_bytes_required,
- std::vector<ObjectID>* objects_to_evict) {
+int64_t EvictionPolicy::ChooseObjectsToEvict(int64_t num_bytes_required,
+ std::vector<ObjectID>* objects_to_evict) {
int64_t bytes_evicted =
- cache_.choose_objects_to_evict(num_bytes_required, objects_to_evict);
+ cache_.ChooseObjectsToEvict(num_bytes_required, objects_to_evict);
/* Update the LRU cache. */
for (auto& object_id : *objects_to_evict) {
- cache_.remove(object_id);
+ cache_.Remove(object_id);
}
/* Update the number of bytes used. */
memory_used_ -= bytes_evicted;
@@ -65,16 +65,15 @@ int64_t EvictionPolicy::choose_objects_to_evict(int64_t num_bytes_required,
return bytes_evicted;
}
-void EvictionPolicy::object_created(const ObjectID& object_id) {
+void EvictionPolicy::ObjectCreated(const ObjectID& object_id) {
auto entry = store_info_->objects[object_id].get();
- cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+ cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
int64_t size = entry->info.data_size + entry->info.metadata_size;
memory_used_ += size;
ARROW_CHECK(memory_used_ <= store_info_->memory_capacity);
}
-bool EvictionPolicy::require_space(int64_t size,
- std::vector<ObjectID>* objects_to_evict) {
+bool EvictionPolicy::RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict) {
/* Check if there is enough space to create the object. */
int64_t required_space = memory_used_ + size - store_info_->memory_capacity;
/* Try to free up at least as much space as we need right now but ideally
@@ -82,29 +81,29 @@ bool EvictionPolicy::require_space(int64_t size,
int64_t space_to_free = std::max(required_space, store_info_->memory_capacity / 5);
ARROW_LOG(DEBUG) << "not enough space to create this object, so evicting objects";
/* Choose some objects to evict, and update the return pointers. */
- int64_t num_bytes_evicted = choose_objects_to_evict(space_to_free, objects_to_evict);
+ int64_t num_bytes_evicted = ChooseObjectsToEvict(space_to_free, objects_to_evict);
ARROW_LOG(INFO) << "There is not enough space to create this object, so evicting "
<< objects_to_evict->size() << " objects to free up "
<< num_bytes_evicted << " bytes.";
return num_bytes_evicted >= required_space && num_bytes_evicted > 0;
}
-void EvictionPolicy::begin_object_access(const ObjectID& object_id,
- std::vector<ObjectID>* objects_to_evict) {
+void EvictionPolicy::BeginObjectAccess(const ObjectID& object_id,
+ std::vector<ObjectID>* objects_to_evict) {
/* If the object is in the LRU cache, remove it. */
- cache_.remove(object_id);
+ cache_.Remove(object_id);
}
-void EvictionPolicy::end_object_access(const ObjectID& object_id,
- std::vector<ObjectID>* objects_to_evict) {
+void EvictionPolicy::EndObjectAccess(const ObjectID& object_id,
+ std::vector<ObjectID>* objects_to_evict) {
auto entry = store_info_->objects[object_id].get();
/* Add the object to the LRU cache.*/
- cache_.add(object_id, entry->info.data_size + entry->info.metadata_size);
+ cache_.Add(object_id, entry->info.data_size + entry->info.metadata_size);
}
-void EvictionPolicy::remove_object(const ObjectID& object_id) {
+void EvictionPolicy::RemoveObject(const ObjectID& object_id) {
/* If the object is in the LRU cache, remove it. */
- cache_.remove(object_id);
+ cache_.Remove(object_id);
auto entry = store_info_->objects[object_id].get();
int64_t size = entry->info.data_size + entry->info.metadata_size;
diff --git a/cpp/src/plasma/eviction_policy.h b/cpp/src/plasma/eviction_policy.h
index d13933e..bbd3fc4 100644
--- a/cpp/src/plasma/eviction_policy.h
+++ b/cpp/src/plasma/eviction_policy.h
@@ -38,12 +38,12 @@ class LRUCache {
public:
LRUCache() {}
- void add(const ObjectID& key, int64_t size);
+ void Add(const ObjectID& key, int64_t size);
- void remove(const ObjectID& key);
+ void Remove(const ObjectID& key);
- int64_t choose_objects_to_evict(int64_t num_bytes_required,
- std::vector<ObjectID>* objects_to_evict);
+ int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
+ std::vector<ObjectID>* objects_to_evict);
private:
/// A doubly-linked list containing the items in the cache and
@@ -70,7 +70,7 @@ class EvictionPolicy {
/// cache.
///
/// @param object_id The object ID of the object that was created.
- void object_created(const ObjectID& object_id);
+ void ObjectCreated(const ObjectID& object_id);
/// This method will be called when the Plasma store needs more space, perhaps
/// to create a new object. When this method is called, the eviction
@@ -82,7 +82,7 @@ class EvictionPolicy {
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// @return True if enough space can be freed and false otherwise.
- bool require_space(int64_t size, std::vector<ObjectID>* objects_to_evict);
+ bool RequireSpace(int64_t size, std::vector<ObjectID>* objects_to_evict);
/// This method will be called whenever an unused object in the Plasma store
/// starts to be used. When this method is called, the eviction policy will
@@ -92,8 +92,8 @@ class EvictionPolicy {
/// @param object_id The ID of the object that is now being used.
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
- void begin_object_access(const ObjectID& object_id,
- std::vector<ObjectID>* objects_to_evict);
+ void BeginObjectAccess(const ObjectID& object_id,
+ std::vector<ObjectID>* objects_to_evict);
/// This method will be called whenever an object in the Plasma store that was
/// being used is no longer being used. When this method is called, the
@@ -103,8 +103,8 @@ class EvictionPolicy {
/// @param object_id The ID of the object that is no longer being used.
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
- void end_object_access(const ObjectID& object_id,
- std::vector<ObjectID>* objects_to_evict);
+ void EndObjectAccess(const ObjectID& object_id,
+ std::vector<ObjectID>* objects_to_evict);
/// Choose some objects to evict from the Plasma store. When this method is
/// called, the eviction policy will assume that the objects chosen to be
@@ -117,13 +117,13 @@ class EvictionPolicy {
/// @param objects_to_evict The object IDs that were chosen for eviction will
/// be stored into this vector.
/// @return The total number of bytes of space chosen to be evicted.
- int64_t choose_objects_to_evict(int64_t num_bytes_required,
- std::vector<ObjectID>* objects_to_evict);
+ int64_t ChooseObjectsToEvict(int64_t num_bytes_required,
+ std::vector<ObjectID>* objects_to_evict);
/// This method will be called when an object is going to be removed
///
/// @param object_id The ID of the object that is now being used.
- void remove_object(const ObjectID& object_id);
+ void RemoveObject(const ObjectID& object_id);
private:
/// The amount of memory (in bytes) currently being used.
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
index 4d7d285..7f66bf6 100644
--- a/cpp/src/plasma/format/common.fbs
+++ b/cpp/src/plasma/format/common.fbs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+namespace plasma.flatbuf;
+
// Object information data structure.
table ObjectInfo {
// Object ID of this object.
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 333e7d7..082ae9c 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -16,6 +16,7 @@
// under the License.
// Plasma protocol specification
+namespace plasma.flatbuf;
enum MessageType:long {
// Message that gets send when a client hangs up.
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index d9e805c..d63ceb6 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -35,6 +35,8 @@ constexpr int64_t kConnectTimeoutMs = 100;
namespace plasma {
+using flatbuf::MessageType;
+
Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
ssize_t nbytes = 0;
size_t bytesleft = length;
@@ -111,7 +113,7 @@ Status ReadMessage(int fd, MessageType* type, std::vector<uint8_t>* buffer) {
return Status::OK();
}
-int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
+int BindIpcSock(const std::string& pathname, bool shall_listen) {
struct sockaddr_un socket_address;
int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
if (socket_fd < 0) {
@@ -160,13 +162,13 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
if (timeout < 0) {
timeout = kConnectTimeoutMs;
}
- *fd = connect_ipc_sock(pathname);
+ *fd = ConnectIpcSock(pathname);
while (*fd < 0 && num_retries > 0) {
ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
<< ", retrying " << num_retries << " more times";
// Sleep for timeout milliseconds.
usleep(static_cast<int>(timeout * 1000));
- *fd = connect_ipc_sock(pathname);
+ *fd = ConnectIpcSock(pathname);
--num_retries;
}
// If we could not connect to the socket, exit.
@@ -178,7 +180,7 @@ Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
return Status::OK();
}
-int connect_ipc_sock(const std::string& pathname) {
+int ConnectIpcSock(const std::string& pathname) {
struct sockaddr_un socket_address;
int socket_fd;
@@ -214,7 +216,7 @@ int AcceptClient(int socket_fd) {
return client_fd;
}
-std::unique_ptr<uint8_t[]> read_message_async(int sock) {
+std::unique_ptr<uint8_t[]> ReadMessageAsync(int sock) {
int64_t size;
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
if (!s.ok()) {
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 9fc2d1c..745518a 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -30,10 +30,14 @@
#include "arrow/status.h"
#include "plasma/compat.h"
+namespace plasma {
+
+namespace flatbuf {
+
// Forward declaration outside the namespace, which is defined in plasma_generated.h.
enum class MessageType : int64_t;
-namespace plasma {
+} // namespace flatbuf
// TODO(pcm): Replace our own custom message header (message type,
// message length, plasma protocol verion) with one that is serialized
@@ -44,22 +48,22 @@ using arrow::Status;
Status WriteBytes(int fd, uint8_t* cursor, size_t length);
-Status WriteMessage(int fd, MessageType type, int64_t length, uint8_t* bytes);
+Status WriteMessage(int fd, flatbuf::MessageType type, int64_t length, uint8_t* bytes);
Status ReadBytes(int fd, uint8_t* cursor, size_t length);
-Status ReadMessage(int fd, MessageType* type, std::vector<uint8_t>* buffer);
+Status ReadMessage(int fd, flatbuf::MessageType* type, std::vector<uint8_t>* buffer);
-int bind_ipc_sock(const std::string& pathname, bool shall_listen);
+int BindIpcSock(const std::string& pathname, bool shall_listen);
-int connect_ipc_sock(const std::string& pathname);
+int ConnectIpcSock(const std::string& pathname);
Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
int64_t timeout, int* fd);
int AcceptClient(int socket_fd);
-std::unique_ptr<uint8_t[]> read_message_async(int sock);
+std::unique_ptr<uint8_t[]> ReadMessageAsync(int sock);
} // namespace plasma
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
index e2403fd..3df0892 100644
--- a/cpp/src/plasma/malloc.cc
+++ b/cpp/src/plasma/malloc.cc
@@ -182,7 +182,7 @@ int fake_munmap(void* addr, int64_t size) {
return r;
}
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
+void GetMallocMapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
// TODO(rshin): Implement a more efficient search through mmap_records.
for (const auto& entry : mmap_records) {
if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) {
@@ -197,7 +197,7 @@ void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offse
*offset = 0;
}
-int64_t get_mmap_size(int fd) {
+int64_t GetMmapSize(int fd) {
for (const auto& entry : mmap_records) {
if (entry.second.fd == fd) {
return entry.second.size;
@@ -207,4 +207,4 @@ int64_t get_mmap_size(int fd) {
return -1; // This code is never reached.
}
-void set_malloc_granularity(int value) { change_mparam(M_GRANULARITY, value); }
+void SetMallocGranularity(int value) { change_mparam(M_GRANULARITY, value); }
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
index c24f154..86f14f1 100644
--- a/cpp/src/plasma/malloc.h
+++ b/cpp/src/plasma/malloc.h
@@ -27,14 +27,14 @@
/// (in the client we cannot guarantee that these mmaps are contiguous).
constexpr int64_t kMmapRegionsGap = sizeof(size_t);
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
+void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
/// Get the mmap size corresponding to a specific file descriptor.
///
/// @param fd The file descriptor to look up.
/// @return The size of the corresponding memory-mapped file.
-int64_t get_mmap_size(int fd);
+int64_t GetMmapSize(int fd);
-void set_malloc_granularity(int value);
+void SetMallocGranularity(int value);
#endif // MALLOC_H
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
index d98cbb9..601a612 100644
--- a/cpp/src/plasma/plasma.cc
+++ b/cpp/src/plasma/plasma.cc
@@ -24,6 +24,8 @@
#include "plasma/common.h"
#include "plasma/protocol.h"
+namespace fb = plasma::flatbuf;
+
namespace plasma {
extern "C" {
@@ -37,7 +39,7 @@ ObjectTableEntry::~ObjectTableEntry() {
pointer = nullptr;
}
-int warn_if_sigpipe(int status, int client_sock) {
+int WarnIfSigpipe(int status, int client_sock) {
if (status >= 0) {
return 0;
}
@@ -62,9 +64,9 @@ int warn_if_sigpipe(int status, int client_sock) {
* @return The object info buffer. It is the caller's responsibility to free
* this buffer with "delete" after it has been used.
*/
-std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) {
+std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(fb::ObjectInfoT* object_info) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreateObjectInfo(fbb, object_info);
+ auto message = fb::CreateObjectInfo(fbb, object_info);
fbb.Finish(message);
auto notification =
std::unique_ptr<uint8_t[]>(new uint8_t[sizeof(int64_t) + fbb.GetSize()]);
@@ -73,8 +75,8 @@ std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info) {
return notification;
}
-ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info,
- const ObjectID& object_id) {
+ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
+ const ObjectID& object_id) {
auto it = store_info->objects.find(object_id);
if (it == store_info->objects.end()) {
return NULL;
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 8cc7cac..6e15807 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -95,14 +95,14 @@ struct PlasmaObject {
int device_num;
};
-enum class object_state : int {
+enum class ObjectState : int {
/// Object was created but not sealed in the local Plasma Store.
PLASMA_CREATED = 1,
/// Object is sealed and stored in the local Plasma Store.
PLASMA_SEALED
};
-enum class object_status : int {
+enum class ObjectStatus : int {
/// The object was not found.
OBJECT_NOT_FOUND = 0,
/// The object was found.
@@ -119,7 +119,7 @@ struct ObjectTableEntry {
/// Object id of this object.
ObjectID object_id;
/// Object info like size, creation time and owner.
- ObjectInfoT info;
+ flatbuf::ObjectInfoT info;
/// Memory mapped file containing the object.
int fd;
/// Device number.
@@ -138,7 +138,7 @@ struct ObjectTableEntry {
int ref_count;
/// The state of the object, e.g., whether it is open or sealed.
- object_state state;
+ ObjectState state;
/// The digest of the object. Used to see if two objects are the same.
unsigned char digest[kDigestSize];
};
@@ -166,8 +166,8 @@ struct PlasmaStoreInfo {
/// @param object_id The object_id of the entry we are looking for.
/// @return The entry associated with the object_id or NULL if the object_id
/// is not present.
-ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info,
- const ObjectID& object_id);
+ObjectTableEntry* GetObjectTableEntry(PlasmaStoreInfo* store_info,
+ const ObjectID& object_id);
/// Print a warning if the status is less than zero. This should be used to check
/// the success of messages sent to plasma clients. We print a warning instead of
@@ -183,9 +183,9 @@ ObjectTableEntry* get_object_table_entry(PlasmaStoreInfo* store_info,
/// @param client_sock The client socket. This is just used to print some extra
/// information.
/// @return The errno set.
-int warn_if_sigpipe(int status, int client_sock);
+int WarnIfSigpipe(int status, int client_sock);
-std::unique_ptr<uint8_t[]> create_object_info_buffer(ObjectInfoT* object_info);
+std::unique_ptr<uint8_t[]> CreateObjectInfoBuffer(flatbuf::ObjectInfoT* object_info);
} // namespace plasma
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 0d55505..f5ea42a 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -27,13 +27,19 @@
#include "arrow/gpu/cuda_api.h"
#endif
+namespace fb = plasma::flatbuf;
+
namespace plasma {
+using fb::MessageType;
+using fb::PlasmaError;
+using fb::PlasmaObjectSpec;
+
using flatbuffers::uoffset_t;
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
-to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
- int64_t num_objects) {
+ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
+ int64_t num_objects) {
std::vector<flatbuffers::Offset<flatbuffers::String>> results;
for (int64_t i = 0; i < num_objects; i++) {
results.push_back(fbb->CreateString(object_ids[i].binary()));
@@ -53,7 +59,7 @@ Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* b
// Helper function to create a vector of elements from Data (Request/Reply struct).
// The Getter function is used to extract one element from Data.
template <typename T, typename Data, typename Getter>
-void to_vector(const Data& request, std::vector<T>* out, const Getter& getter) {
+void ToVector(const Data& request, std::vector<T>* out, const Getter& getter) {
int count = request.count();
out->clear();
out->reserve(count);
@@ -74,16 +80,16 @@ Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBui
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
int64_t metadata_size, int device_num) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()),
- data_size, metadata_size, device_num);
+ auto message = fb::CreatePlasmaCreateRequest(fbb, fbb.CreateString(object_id.binary()),
+ data_size, metadata_size, device_num);
return PlasmaSend(sock, MessageType::PlasmaCreateRequest, &fbb, message);
}
Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* data_size, int64_t* metadata_size, int* device_num) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*data_size = message->data_size();
*metadata_size = message->metadata_size();
*object_id = ObjectID::from_binary(message->object_id()->str());
@@ -99,14 +105,15 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
object->device_num);
auto object_string = fbb.CreateString(object_id.binary());
#ifdef PLASMA_GPU
- flatbuffers::Offset<CudaHandle> ipc_handle;
+ flatbuffers::Offset<fb::CudaHandle> ipc_handle;
if (object->device_num != 0) {
std::shared_ptr<arrow::Buffer> handle;
object->ipc_handle->Serialize(arrow::default_memory_pool(), &handle);
- ipc_handle = CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()));
+ ipc_handle =
+ fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size()));
}
#endif
- PlasmaCreateReplyBuilder crb(fbb);
+ fb::PlasmaCreateReplyBuilder crb(fbb);
crb.add_error(static_cast<PlasmaError>(error_code));
crb.add_plasma_object(&plasma_object);
crb.add_object_id(object_string);
@@ -126,8 +133,8 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
PlasmaObject* object, int* store_fd, int64_t* mmap_size) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaCreateReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
object->store_fd = message->plasma_object()->segment_index();
object->data_offset = message->plasma_object()->data_offset();
@@ -145,33 +152,33 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
&object->ipc_handle);
}
#endif
- return plasma_error_status(message->error());
+ return PlasmaErrorStatus(message->error());
}
Status SendAbortRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary()));
+ auto message = fb::CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType::PlasmaAbortRequest, &fbb, message);
}
Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaAbortRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaAbortRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Status SendAbortReply(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary()));
+ auto message = fb::CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType::PlasmaAbortReply, &fbb, message);
}
Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaAbortReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaAbortReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
@@ -181,16 +188,16 @@ Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
flatbuffers::FlatBufferBuilder fbb;
auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
- auto message =
- CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), digest_string);
+ auto message = fb::CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()),
+ digest_string);
return PlasmaSend(sock, MessageType::PlasmaSealRequest, &fbb, message);
}
Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
unsigned char* digest) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaSealRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
ARROW_CHECK(message->digest()->size() == kDigestSize);
memcpy(digest, message->digest()->data(), kDigestSize);
@@ -199,30 +206,32 @@ Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
Status SendSealReply(int sock, ObjectID object_id, PlasmaError error) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error);
+ auto message =
+ fb::CreatePlasmaSealReply(fbb, fbb.CreateString(object_id.binary()), error);
return PlasmaSend(sock, MessageType::PlasmaSealReply, &fbb, message);
}
Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaSealReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
+ return PlasmaErrorStatus(message->error());
}
// Release messages.
Status SendReleaseRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
+ auto message =
+ fb::CreatePlasmaReleaseRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType::PlasmaReleaseRequest, &fbb, message);
}
Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaReleaseRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
@@ -230,34 +239,36 @@ Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id) {
Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
- CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error);
+ fb::CreatePlasmaReleaseReply(fbb, fbb.CreateString(object_id.binary()), error);
return PlasmaSend(sock, MessageType::PlasmaReleaseReply, &fbb, message);
}
Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaReleaseReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
+ return PlasmaErrorStatus(message->error());
}
// Delete objects messages.
Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids) {
flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaDeleteRequest(fbb, static_cast<int32_t>(object_ids.size()),
- to_flatbuffer(&fbb, &object_ids[0], object_ids.size()));
+ auto message = fb::CreatePlasmaDeleteRequest(
+ fbb, static_cast<int32_t>(object_ids.size()),
+ ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteRequest, &fbb, message);
}
Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids) {
+ using fb::PlasmaDeleteRequest;
+
DCHECK(data);
DCHECK(object_ids);
auto message = flatbuffers::GetRoot<PlasmaDeleteRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
- to_vector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ ToVector(*message, object_ids, [](const PlasmaDeleteRequest& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str());
});
return Status::OK();
@@ -267,24 +278,26 @@ Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
const std::vector<PlasmaError>& errors) {
DCHECK(object_ids.size() == errors.size());
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDeleteReply(
+ auto message = fb::CreatePlasmaDeleteReply(
fbb, static_cast<int32_t>(object_ids.size()),
- to_flatbuffer(&fbb, &object_ids[0], object_ids.size()),
+ ToFlatbuffer(&fbb, &object_ids[0], object_ids.size()),
fbb.CreateVector(reinterpret_cast<const int32_t*>(&errors[0]), object_ids.size()));
return PlasmaSend(sock, MessageType::PlasmaDeleteReply, &fbb, message);
}
Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
std::vector<PlasmaError>* errors) {
+ using fb::PlasmaDeleteReply;
+
DCHECK(data);
DCHECK(object_ids);
DCHECK(errors);
auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
- to_vector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
+ DCHECK(VerifyFlatbuffer(message, data, size));
+ ToVector(*message, object_ids, [](const PlasmaDeleteReply& request, int i) {
return ObjectID::from_binary(request.object_ids()->Get(i)->str());
});
- to_vector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
+ ToVector(*message, errors, [](const PlasmaDeleteReply& request, int i) {
return static_cast<PlasmaError>(request.errors()->data()[i]);
});
return Status::OK();
@@ -295,15 +308,15 @@ Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object
Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
- CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
+ fb::CreatePlasmaStatusRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
return PlasmaSend(sock, MessageType::PlasmaStatusRequest, &fbb, message);
}
Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
int64_t num_objects) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaStatusRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
@@ -314,23 +327,23 @@ Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
int64_t num_objects) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
- CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVector(object_status, num_objects));
+ fb::CreatePlasmaStatusReply(fbb, ToFlatbuffer(&fbb, object_ids, num_objects),
+ fbb.CreateVector(object_status, num_objects));
return PlasmaSend(sock, MessageType::PlasmaStatusReply, &fbb, message);
}
int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
return message->object_ids()->size();
}
Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
int object_status[], int64_t num_objects) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaStatusReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
@@ -344,30 +357,31 @@ Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
Status SendContainsRequest(int sock, ObjectID object_id) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
+ auto message =
+ fb::CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
return PlasmaSend(sock, MessageType::PlasmaContainsRequest, &fbb, message);
}
Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaContainsRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
return Status::OK();
}
Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), has_object);
+ auto message = fb::CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()),
+ has_object);
return PlasmaSend(sock, MessageType::PlasmaContainsReply, &fbb, message);
}
Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
bool* has_object) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaContainsReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*has_object = message->has_object();
return Status::OK();
@@ -377,7 +391,7 @@ Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
Status SendConnectRequest(int sock) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaConnectRequest(fbb);
+ auto message = fb::CreatePlasmaConnectRequest(fbb);
return PlasmaSend(sock, MessageType::PlasmaConnectRequest, &fbb, message);
}
@@ -385,14 +399,14 @@ Status ReadConnectRequest(uint8_t* data) { return Status::OK(); }
Status SendConnectReply(int sock, int64_t memory_capacity) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
+ auto message = fb::CreatePlasmaConnectReply(fbb, memory_capacity);
return PlasmaSend(sock, MessageType::PlasmaConnectReply, &fbb, message);
}
Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaConnectReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*memory_capacity = message->memory_capacity();
return Status::OK();
}
@@ -401,28 +415,28 @@ Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity) {
Status SendEvictRequest(int sock, int64_t num_bytes) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
+ auto message = fb::CreatePlasmaEvictRequest(fbb, num_bytes);
return PlasmaSend(sock, MessageType::PlasmaEvictRequest, &fbb, message);
}
Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaEvictRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*num_bytes = message->num_bytes();
return Status::OK();
}
Status SendEvictReply(int sock, int64_t num_bytes) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaEvictReply(fbb, num_bytes);
+ auto message = fb::CreatePlasmaEvictReply(fbb, num_bytes);
return PlasmaSend(sock, MessageType::PlasmaEvictReply, &fbb, message);
}
Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaEvictReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
num_bytes = message->num_bytes();
return Status::OK();
}
@@ -432,16 +446,16 @@ Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes) {
Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
int64_t timeout_ms) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaGetRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- timeout_ms);
+ auto message = fb::CreatePlasmaGetRequest(
+ fbb, ToFlatbuffer(&fbb, object_ids, num_objects), timeout_ms);
return PlasmaSend(sock, MessageType::PlasmaGetRequest, &fbb, message);
}
Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
int64_t* timeout_ms) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaGetRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
auto object_id = message->object_ids()->Get(i)->str();
object_ids.push_back(ObjectID::from_binary(object_id));
@@ -457,7 +471,7 @@ Status SendGetReply(int sock, ObjectID object_ids[],
flatbuffers::FlatBufferBuilder fbb;
std::vector<PlasmaObjectSpec> objects;
- std::vector<flatbuffers::Offset<CudaHandle>> handles;
+ std::vector<flatbuffers::Offset<fb::CudaHandle>> handles;
for (int64_t i = 0; i < num_objects; ++i) {
const PlasmaObject& object = plasma_objects[object_ids[i]];
objects.push_back(PlasmaObjectSpec(object.store_fd, object.data_offset,
@@ -468,12 +482,12 @@ Status SendGetReply(int sock, ObjectID object_ids[],
std::shared_ptr<arrow::Buffer> handle;
object.ipc_handle->Serialize(arrow::default_memory_pool(), &handle);
handles.push_back(
- CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())));
+ fb::CreateCudaHandle(fbb, fbb.CreateVector(handle->data(), handle->size())));
}
#endif
}
- auto message = CreatePlasmaGetReply(
- fbb, to_flatbuffer(&fbb, object_ids, num_objects),
+ auto message = fb::CreatePlasmaGetReply(
+ fbb, ToFlatbuffer(&fbb, object_ids, num_objects),
fbb.CreateVectorOfStructs(objects.data(), num_objects), fbb.CreateVector(store_fds),
fbb.CreateVector(mmap_sizes), fbb.CreateVector(handles));
return PlasmaSend(sock, MessageType::PlasmaGetReply, &fbb, message);
@@ -483,11 +497,11 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
PlasmaObject plasma_objects[], int64_t num_objects,
std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
+ auto message = flatbuffers::GetRoot<fb::PlasmaGetReply>(data);
#ifdef PLASMA_GPU
int handle_pos = 0;
#endif
- DCHECK(verify_flatbuffer(message, data, size));
+ DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < num_objects; ++i) {
object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
}
@@ -519,14 +533,14 @@ Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
flatbuffers::FlatBufferBuilder fbb;
auto message =
- CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
+ fb::CreatePlasmaFetchRequest(fbb, ToFlatbuffer(&fbb, object_ids, num_objects));
return PlasmaSend(sock, MessageType::PlasmaFetchRequest, &fbb, message);
}
Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaFetchRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
}
@@ -539,23 +553,23 @@ Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_re
int num_ready_objects, int64_t timeout_ms) {
flatbuffers::FlatBufferBuilder fbb;
- std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs;
+ std::vector<flatbuffers::Offset<fb::ObjectRequestSpec>> object_request_specs;
for (int i = 0; i < num_requests; i++) {
- object_request_specs.push_back(CreateObjectRequestSpec(
+ object_request_specs.push_back(fb::CreateObjectRequestSpec(
fbb, fbb.CreateString(object_requests[i].object_id.binary()),
static_cast<int>(object_requests[i].type)));
}
- auto message = CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs),
- num_ready_objects, timeout_ms);
+ auto message = fb::CreatePlasmaWaitRequest(fbb, fbb.CreateVector(object_request_specs),
+ num_ready_objects, timeout_ms);
return PlasmaSend(sock, MessageType::PlasmaWaitRequest, &fbb, message);
}
Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
int64_t* timeout_ms, int* num_ready_objects) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaWaitRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
*timeout_ms = message->timeout();
@@ -565,7 +579,7 @@ Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requ
ObjectRequest object_request(
{object_id,
static_cast<ObjectRequestType>(message->object_requests()->Get(i)->type()),
- ObjectStatus::Nonexistent});
+ fb::ObjectStatus::Nonexistent});
object_requests[object_id] = object_request;
}
return Status::OK();
@@ -575,14 +589,14 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
int num_ready_objects) {
flatbuffers::FlatBufferBuilder fbb;
- std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
+ std::vector<flatbuffers::Offset<fb::ObjectReply>> object_replies;
for (const auto& entry : object_requests) {
const auto& object_request = entry.second;
- object_replies.push_back(CreateObjectReply(
+ object_replies.push_back(fb::CreateObjectReply(
fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
}
- auto message = CreatePlasmaWaitReply(
+ auto message = fb::CreatePlasmaWaitReply(
fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
return PlasmaSend(sock, MessageType::PlasmaWaitReply, &fbb, message);
}
@@ -591,8 +605,8 @@ Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[]
int* num_ready_objects) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaWaitReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*num_ready_objects = message->num_ready_objects();
for (int i = 0; i < *num_ready_objects; i++) {
object_requests[i].object_id =
@@ -606,7 +620,7 @@ Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[]
Status SendSubscribeRequest(int sock) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaSubscribeRequest(fbb);
+ auto message = fb::CreatePlasmaSubscribeRequest(fbb);
return PlasmaSend(sock, MessageType::PlasmaSubscribeRequest, &fbb, message);
}
@@ -616,15 +630,15 @@ Status SendDataRequest(int sock, ObjectID object_id, const char* address, int po
flatbuffers::FlatBufferBuilder fbb;
auto addr = fbb.CreateString(address, strlen(address));
auto message =
- CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
+ fb::CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
return PlasmaSend(sock, MessageType::PlasmaDataRequest, &fbb, message);
}
Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
int* port) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaDataRequest>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
DCHECK(message->object_id()->size() == sizeof(ObjectID));
*object_id = ObjectID::from_binary(message->object_id()->str());
*address = strdup(message->address()->c_str());
@@ -635,16 +649,16 @@ Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** a
Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
int64_t metadata_size) {
flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()),
- object_size, metadata_size);
+ auto message = fb::CreatePlasmaDataReply(fbb, fbb.CreateString(object_id.binary()),
+ object_size, metadata_size);
return PlasmaSend(sock, MessageType::PlasmaDataReply, &fbb, message);
}
Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
int64_t* object_size, int64_t* metadata_size) {
DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
- DCHECK(verify_flatbuffer(message, data, size));
+ auto message = flatbuffers::GetRoot<fb::PlasmaDataReply>(data);
+ DCHECK(VerifyFlatbuffer(message, data, size));
*object_id = ObjectID::from_binary(message->object_id()->str());
*object_size = static_cast<int64_t>(message->object_size());
*metadata_size = static_cast<int64_t>(message->metadata_size());
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index 1e34343..1665f0c 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -30,8 +30,11 @@ namespace plasma {
using arrow::Status;
+using flatbuf::MessageType;
+using flatbuf::PlasmaError;
+
template <class T>
-bool verify_flatbuffer(T* object, uint8_t* data, size_t size) {
+bool VerifyFlatbuffer(T* object, uint8_t* data, size_t size) {
flatbuffers::Verifier verifier(data, size);
return object->Verify(verifier);
}
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index c8bf466..999aa6a 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -65,6 +65,8 @@ using arrow::gpu::CudaContext;
using arrow::gpu::CudaDeviceManager;
#endif
+namespace fb = plasma::flatbuf;
+
namespace plasma {
extern "C" {
@@ -119,11 +121,11 @@ PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory, std::string dir
// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
PlasmaStore::~PlasmaStore() {}
-const PlasmaStoreInfo* PlasmaStore::get_plasma_store_info() { return &store_info_; }
+const PlasmaStoreInfo* PlasmaStore::GetPlasmaStoreInfo() { return &store_info_; }
// If this client is not already using the object, add the client to the
// object's list of clients, otherwise do nothing.
-void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* client) {
+void PlasmaStore::AddToClientObjectIds(ObjectTableEntry* entry, Client* client) {
// Check if this client is already using the object.
if (client->object_ids.find(entry->object_id) != client->object_ids.end()) {
return;
@@ -133,8 +135,8 @@ void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* clie
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is being used.
std::vector<ObjectID> objects_to_evict;
- eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
- delete_objects(objects_to_evict);
+ eviction_policy_.BeginObjectAccess(entry->object_id, &objects_to_evict);
+ DeleteObjects(objects_to_evict);
}
// Increase reference count.
entry->ref_count++;
@@ -144,9 +146,9 @@ void PlasmaStore::add_to_client_object_ids(ObjectTableEntry* entry, Client* clie
}
// Create a new object buffer in the hash table.
-PlasmaError PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
- int64_t metadata_size, int device_num,
- Client* client, PlasmaObject* result) {
+PlasmaError PlasmaStore::CreateObject(const ObjectID& object_id, int64_t data_size,
+ int64_t metadata_size, int device_num,
+ Client* client, PlasmaObject* result) {
ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
if (store_info_.objects.count(object_id) != 0) {
// There is already an object with the same ID in the Plasma Store, so
@@ -177,8 +179,8 @@ PlasmaError PlasmaStore::create_object(const ObjectID& object_id, int64_t data_s
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
bool success =
- eviction_policy_.require_space(data_size + metadata_size, &objects_to_evict);
- delete_objects(objects_to_evict);
+ eviction_policy_.RequireSpace(data_size + metadata_size, &objects_to_evict);
+ DeleteObjects(objects_to_evict);
// Return an error to the client if not enough space could be freed to
// create the object.
if (!success) {
@@ -198,7 +200,7 @@ PlasmaError PlasmaStore::create_object(const ObjectID& object_id, int64_t data_s
int64_t map_size = 0;
ptrdiff_t offset = 0;
if (device_num == 0) {
- get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
+ GetMallocMapinfo(pointer, &fd, &map_size, &offset);
assert(fd != -1);
}
auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
@@ -211,7 +213,7 @@ PlasmaError PlasmaStore::create_object(const ObjectID& object_id, int64_t data_s
entry->fd = fd;
entry->map_size = map_size;
entry->offset = offset;
- entry->state = object_state::PLASMA_CREATED;
+ entry->state = ObjectState::PLASMA_CREATED;
entry->device_num = device_num;
#ifdef PLASMA_GPU
if (device_num != 0) {
@@ -227,18 +229,18 @@ PlasmaError PlasmaStore::create_object(const ObjectID& object_id, int64_t data_s
result->metadata_size = metadata_size;
result->device_num = device_num;
// Notify the eviction policy that this object was created. This must be done
- // immediately before the call to add_to_client_object_ids so that the
+ // immediately before the call to AddToClientObjectIds so that the
// eviction policy does not have an opportunity to evict the object.
- eviction_policy_.object_created(object_id);
+ eviction_policy_.ObjectCreated(object_id);
// Record that this client is using this object.
- add_to_client_object_ids(store_info_.objects[object_id].get(), client);
+ AddToClientObjectIds(store_info_.objects[object_id].get(), client);
return PlasmaError::OK;
}
void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
DCHECK(object != NULL);
DCHECK(entry != NULL);
- DCHECK(entry->state == object_state::PLASMA_SEALED);
+ DCHECK(entry->state == ObjectState::PLASMA_SEALED);
#ifdef PLASMA_GPU
if (entry->device_num != 0) {
object->ipc_handle = entry->ipc_handle;
@@ -252,7 +254,7 @@ void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
object->device_num = entry->device_num;
}
-void PlasmaStore::return_from_get(GetRequest* get_req) {
+void PlasmaStore::ReturnFromGet(GetRequest* get_req) {
// Figure out how many file descriptors we need to send.
std::unordered_set<int> fds_to_send;
std::vector<int> store_fds;
@@ -263,14 +265,14 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
if (object.data_size != -1 && fds_to_send.count(fd) == 0 && fd != -1) {
fds_to_send.insert(fd);
store_fds.push_back(fd);
- mmap_sizes.push_back(get_mmap_size(fd));
+ mmap_sizes.push_back(GetMmapSize(fd));
}
}
// Send the get reply to the client.
Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
get_req->object_ids.size(), store_fds, mmap_sizes);
- warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd);
+ WarnIfSigpipe(s.ok() ? 0 : -1, get_req->client->fd);
// If we successfully sent the get reply message to the client, then also send
// the file descriptors.
if (s.ok()) {
@@ -289,7 +291,7 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
error_code = send_fd(get_req->client->fd, store_fd);
continue;
}
- warn_if_sigpipe(error_code, get_req->client->fd);
+ WarnIfSigpipe(error_code, get_req->client->fd);
break;
}
}
@@ -316,26 +318,26 @@ void PlasmaStore::return_from_get(GetRequest* get_req) {
delete get_req;
}
-void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
+void PlasmaStore::UpdateObjectGetRequests(const ObjectID& object_id) {
auto& get_requests = object_get_requests_[object_id];
size_t index = 0;
size_t num_requests = get_requests.size();
for (size_t i = 0; i < num_requests; ++i) {
auto get_req = get_requests[index];
- auto entry = get_object_table_entry(&store_info_, object_id);
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
// Record the fact that this client will be using this object and will
// be responsible for releasing this object.
- add_to_client_object_ids(entry, get_req->client);
+ AddToClientObjectIds(entry, get_req->client);
// If this get request is done, reply to the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
- return_from_get(get_req);
+ ReturnFromGet(get_req);
} else {
- // The call to return_from_get will remove the current element in the
+ // The call to ReturnFromGet will remove the current element in the
// array, so we only increment the counter in the else branch.
index += 1;
}
@@ -347,23 +349,23 @@ void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
object_get_requests_.erase(object_id);
}
-void PlasmaStore::process_get_request(Client* client,
- const std::vector<ObjectID>& object_ids,
- int64_t timeout_ms) {
+void PlasmaStore::ProcessGetRequest(Client* client,
+ const std::vector<ObjectID>& object_ids,
+ int64_t timeout_ms) {
// Create a get request for this object.
auto get_req = new GetRequest(client, object_ids);
for (auto object_id : object_ids) {
// Check if this object is already present locally. If so, record that the
// object is being used and mark it as accounted for.
- auto entry = get_object_table_entry(&store_info_, object_id);
- if (entry && entry->state == object_state::PLASMA_SEALED) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
+ if (entry && entry->state == ObjectState::PLASMA_SEALED) {
// Update the get request to take into account the present object.
PlasmaObject_init(&get_req->objects[object_id], entry);
get_req->num_satisfied += 1;
// If necessary, record that this client is using this object. In the case
- // where entry == NULL, this will be called from seal_object.
- add_to_client_object_ids(entry, client);
+ // where entry == NULL, this will be called from SealObject.
+ AddToClientObjectIds(entry, client);
} else {
// Add a placeholder plasma object to the get request to indicate that the
// object is not present. This will be parsed by the client. We set the
@@ -377,18 +379,18 @@ void PlasmaStore::process_get_request(Client* client,
// If all of the objects are present already or if the timeout is 0, return to
// the client.
if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
- return_from_get(get_req);
+ ReturnFromGet(get_req);
} else if (timeout_ms != -1) {
// Set a timer that will cause the get request to return to the client. Note
// that a timeout of -1 is used to indicate that no timer should be set.
get_req->timer = loop_->AddTimer(timeout_ms, [this, get_req](int64_t timer_id) {
- return_from_get(get_req);
+ ReturnFromGet(get_req);
return kEventLoopTimerDone;
});
}
}
-int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client* client) {
+int PlasmaStore::RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client) {
auto it = client->object_ids.find(entry->object_id);
if (it != client->object_ids.end()) {
client->object_ids.erase(it);
@@ -400,8 +402,8 @@ int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client*
if (entry->ref_count == 0) {
// Tell the eviction policy that this object is no longer being used.
std::vector<ObjectID> objects_to_evict;
- eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
- delete_objects(objects_to_evict);
+ eviction_policy_.EndObjectAccess(entry->object_id, &objects_to_evict);
+ DeleteObjects(objects_to_evict);
}
// Return 1 to indicate that the client was removed.
return 1;
@@ -411,42 +413,42 @@ int PlasmaStore::remove_from_client_object_ids(ObjectTableEntry* entry, Client*
}
}
-void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
- auto entry = get_object_table_entry(&store_info_, object_id);
+void PlasmaStore::ReleaseObject(const ObjectID& object_id, Client* client) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
// Remove the client from the object's array of clients.
- ARROW_CHECK(remove_from_client_object_ids(entry, client) == 1);
+ ARROW_CHECK(RemoveFromClientObjectIds(entry, client) == 1);
}
// Check if an object is present.
-object_status PlasmaStore::contains_object(const ObjectID& object_id) {
- auto entry = get_object_table_entry(&store_info_, object_id);
- return entry && (entry->state == object_state::PLASMA_SEALED)
- ? object_status::OBJECT_FOUND
- : object_status::OBJECT_NOT_FOUND;
+ObjectStatus PlasmaStore::ContainsObject(const ObjectID& object_id) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
+ return entry && (entry->state == ObjectState::PLASMA_SEALED)
+ ? ObjectStatus::OBJECT_FOUND
+ : ObjectStatus::OBJECT_NOT_FOUND;
}
// Seal an object that has been created in the hash table.
-void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[]) {
+void PlasmaStore::SealObject(const ObjectID& object_id, unsigned char digest[]) {
ARROW_LOG(DEBUG) << "sealing object " << object_id.hex();
- auto entry = get_object_table_entry(&store_info_, object_id);
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != NULL);
- ARROW_CHECK(entry->state == object_state::PLASMA_CREATED);
+ ARROW_CHECK(entry->state == ObjectState::PLASMA_CREATED);
// Set the state of object to SEALED.
- entry->state = object_state::PLASMA_SEALED;
+ entry->state = ObjectState::PLASMA_SEALED;
// Set the object digest.
entry->info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
// Inform all subscribers that a new object has been sealed.
- push_notification(&entry->info);
+ PushNotification(&entry->info);
// Update all get requests that involve this object.
- update_object_get_requests(object_id);
+ UpdateObjectGetRequests(object_id);
}
-int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
- auto entry = get_object_table_entry(&store_info_, object_id);
+int PlasmaStore::AbortObject(const ObjectID& object_id, Client* client) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
- ARROW_CHECK(entry->state != object_state::PLASMA_SEALED)
+ ARROW_CHECK(entry->state != ObjectState::PLASMA_SEALED)
<< "To abort an object it must not have been sealed.";
auto it = client->object_ids.find(object_id);
if (it == client->object_ids.end()) {
@@ -460,8 +462,8 @@ int PlasmaStore::abort_object(const ObjectID& object_id, Client* client) {
}
}
-PlasmaError PlasmaStore::delete_object(ObjectID& object_id) {
- auto entry = get_object_table_entry(&store_info_, object_id);
+PlasmaError PlasmaStore::DeleteObject(ObjectID& object_id) {
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
@@ -470,7 +472,7 @@ PlasmaError PlasmaStore::delete_object(ObjectID& object_id) {
return PlasmaError::ObjectNonexistent;
}
- if (entry->state != object_state::PLASMA_SEALED) {
+ if (entry->state != ObjectState::PLASMA_SEALED) {
// To delete an object it must have been sealed.
return PlasmaError::ObjectNotSealed;
}
@@ -480,40 +482,40 @@ PlasmaError PlasmaStore::delete_object(ObjectID& object_id) {
return PlasmaError::ObjectInUse;
}
- eviction_policy_.remove_object(object_id);
+ eviction_policy_.RemoveObject(object_id);
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
- ObjectInfoT notification;
+ fb::ObjectInfoT notification;
notification.object_id = object_id.binary();
notification.is_deletion = true;
- push_notification(¬ification);
+ PushNotification(¬ification);
return PlasmaError::OK;
}
-void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
+void PlasmaStore::DeleteObjects(const std::vector<ObjectID>& object_ids) {
for (const auto& object_id : object_ids) {
ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
- auto entry = get_object_table_entry(&store_info_, object_id);
+ auto entry = GetObjectTableEntry(&store_info_, object_id);
// TODO(rkn): This should probably not fail, but should instead throw an
// error. Maybe we should also support deleting objects that have been
// created but not sealed.
ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
- ARROW_CHECK(entry->state == object_state::PLASMA_SEALED)
+ ARROW_CHECK(entry->state == ObjectState::PLASMA_SEALED)
<< "To delete an object it must have been sealed.";
ARROW_CHECK(entry->ref_count == 0)
<< "To delete an object, there must be no clients currently using it.";
store_info_.objects.erase(object_id);
// Inform all subscribers that the object has been deleted.
- ObjectInfoT notification;
+ fb::ObjectInfoT notification;
notification.object_id = object_id.binary();
notification.is_deletion = true;
- push_notification(¬ification);
+ PushNotification(¬ification);
}
}
-void PlasmaStore::connect_client(int listener_sock) {
+void PlasmaStore::ConnectClient(int listener_sock) {
int client_fd = AcceptClient(listener_sock);
Client* client = new Client(client_fd);
@@ -522,7 +524,7 @@ void PlasmaStore::connect_client(int listener_sock) {
// Add a callback to handle events on this socket.
// TODO(pcm): Check return value.
loop_->AddFileEvent(client_fd, kEventLoopRead, [this, client](int events) {
- Status s = process_message(client);
+ Status s = ProcessMessage(client);
if (!s.ok()) {
ARROW_LOG(FATAL) << "Failed to process file event: " << s;
}
@@ -530,7 +532,7 @@ void PlasmaStore::connect_client(int listener_sock) {
ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
}
-void PlasmaStore::disconnect_client(int client_fd) {
+void PlasmaStore::DisconnectClient(int client_fd) {
ARROW_CHECK(client_fd > 0);
auto it = connected_clients_.find(client_fd);
ARROW_CHECK(it != connected_clients_.end());
@@ -547,18 +549,18 @@ void PlasmaStore::disconnect_client(int client_fd) {
continue;
}
- if (it->second->state == object_state::PLASMA_SEALED) {
+ if (it->second->state == ObjectState::PLASMA_SEALED) {
// Add sealed objects to a temporary list of object IDs. Do not perform
// the remove here, since it potentially modifies the object_ids table.
sealed_objects.push_back(it->second.get());
} else {
// Abort unsealed object.
- abort_object(it->first, client);
+ AbortObject(it->first, client);
}
}
for (const auto& entry : sealed_objects) {
- remove_from_client_object_ids(entry, client);
+ RemoveFromClientObjectIds(entry, client);
}
if (client->notification_fd > 0) {
@@ -577,15 +579,15 @@ void PlasmaStore::disconnect_client(int client_fd) {
}
/// Send notifications about sealed objects to the subscribers. This is called
-/// in seal_object. If the socket's send buffer is full, the notification will
+/// in SealObject. If the socket's send buffer is full, the notification will
/// be buffered, and this will be called again when the send buffer has room.
/// Since we call erase on pending_notifications_, all iterators get
/// invalidated, which is why we return a valid iterator to the next client to
-/// be used in push_notification.
+/// be used in PushNotification.
///
/// @param it Iterator that points to the client to send the notification to.
/// @return Iterator pointing to the next client.
-PlasmaStore::NotificationMap::iterator PlasmaStore::send_notifications(
+PlasmaStore::NotificationMap::iterator PlasmaStore::SendNotifications(
PlasmaStore::NotificationMap::iterator it) {
int client_fd = it->first;
auto& notifications = it->second.object_notifications;
@@ -614,7 +616,7 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::send_notifications(
// TODO(pcm): Introduce status codes and check in case the file descriptor
// is added twice.
loop_->AddFileEvent(client_fd, kEventLoopWrite, [this, client_fd](int events) {
- send_notifications(pending_notifications_.find(client_fd));
+ SendNotifications(pending_notifications_.find(client_fd));
});
break;
} else {
@@ -643,26 +645,26 @@ PlasmaStore::NotificationMap::iterator PlasmaStore::send_notifications(
}
}
-void PlasmaStore::push_notification(ObjectInfoT* object_info) {
+void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info) {
auto it = pending_notifications_.begin();
while (it != pending_notifications_.end()) {
- auto notification = create_object_info_buffer(object_info);
+ auto notification = CreateObjectInfoBuffer(object_info);
it->second.object_notifications.emplace_back(std::move(notification));
- it = send_notifications(it);
+ it = SendNotifications(it);
}
}
-void PlasmaStore::push_notification(ObjectInfoT* object_info, int client_fd) {
+void PlasmaStore::PushNotification(fb::ObjectInfoT* object_info, int client_fd) {
auto it = pending_notifications_.find(client_fd);
if (it != pending_notifications_.end()) {
- auto notification = create_object_info_buffer(object_info);
+ auto notification = CreateObjectInfoBuffer(object_info);
it->second.object_notifications.emplace_back(std::move(notification));
- send_notifications(it);
+ SendNotifications(it);
}
}
// Subscribe to notifications about sealed objects.
-void PlasmaStore::subscribe_to_updates(Client* client) {
+void PlasmaStore::SubscribeToUpdates(Client* client) {
ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
if (client->notification_fd > 0) {
// This client has already subscribed. Return.
@@ -685,14 +687,14 @@ void PlasmaStore::subscribe_to_updates(Client* client) {
// Push notifications to the new subscriber about existing sealed objects.
for (const auto& entry : store_info_.objects) {
- if (entry.second->state == object_state::PLASMA_SEALED) {
- push_notification(&entry.second->info, fd);
+ if (entry.second->state == ObjectState::PLASMA_SEALED) {
+ PushNotification(&entry.second->info, fd);
}
}
}
-Status PlasmaStore::process_message(Client* client) {
- MessageType type;
+Status PlasmaStore::ProcessMessage(Client* client) {
+ fb::MessageType type;
Status s = ReadMessage(client->fd, &type, &input_buffer_);
ARROW_CHECK(s.ok() || s.IsIOError());
@@ -705,85 +707,85 @@ Status PlasmaStore::process_message(Client* client) {
// Process the different types of requests.
switch (type) {
- case MessageType::PlasmaCreateRequest: {
+ case fb::MessageType::PlasmaCreateRequest: {
int64_t data_size;
int64_t metadata_size;
int device_num;
RETURN_NOT_OK(ReadCreateRequest(input, input_size, &object_id, &data_size,
&metadata_size, &device_num));
PlasmaError error_code =
- create_object(object_id, data_size, metadata_size, device_num, client, &object);
+ CreateObject(object_id, data_size, metadata_size, device_num, client, &object);
int64_t mmap_size = 0;
if (error_code == PlasmaError::OK && device_num == 0) {
- mmap_size = get_mmap_size(object.store_fd);
+ mmap_size = GetMmapSize(object.store_fd);
}
HANDLE_SIGPIPE(
SendCreateReply(client->fd, object_id, &object, error_code, mmap_size),
client->fd);
if (error_code == PlasmaError::OK && device_num == 0) {
- warn_if_sigpipe(send_fd(client->fd, object.store_fd), client->fd);
+ WarnIfSigpipe(send_fd(client->fd, object.store_fd), client->fd);
}
} break;
- case MessageType::PlasmaAbortRequest: {
+ case fb::MessageType::PlasmaAbortRequest: {
RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
- ARROW_CHECK(abort_object(object_id, client) == 1) << "To abort an object, the only "
- "client currently using it "
- "must be the creator.";
+ ARROW_CHECK(AbortObject(object_id, client) == 1) << "To abort an object, the only "
+ "client currently using it "
+ "must be the creator.";
HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
} break;
- case MessageType::PlasmaGetRequest: {
+ case fb::MessageType::PlasmaGetRequest: {
std::vector<ObjectID> object_ids_to_get;
int64_t timeout_ms;
RETURN_NOT_OK(ReadGetRequest(input, input_size, object_ids_to_get, &timeout_ms));
- process_get_request(client, object_ids_to_get, timeout_ms);
+ ProcessGetRequest(client, object_ids_to_get, timeout_ms);
} break;
- case MessageType::PlasmaReleaseRequest: {
+ case fb::MessageType::PlasmaReleaseRequest: {
RETURN_NOT_OK(ReadReleaseRequest(input, input_size, &object_id));
- release_object(object_id, client);
+ ReleaseObject(object_id, client);
} break;
- case MessageType::PlasmaDeleteRequest: {
+ case fb::MessageType::PlasmaDeleteRequest: {
std::vector<ObjectID> object_ids;
std::vector<PlasmaError> error_codes;
RETURN_NOT_OK(ReadDeleteRequest(input, input_size, &object_ids));
error_codes.reserve(object_ids.size());
for (auto& object_id : object_ids) {
- error_codes.push_back(delete_object(object_id));
+ error_codes.push_back(DeleteObject(object_id));
}
HANDLE_SIGPIPE(SendDeleteReply(client->fd, object_ids, error_codes), client->fd);
} break;
- case MessageType::PlasmaContainsRequest: {
+ case fb::MessageType::PlasmaContainsRequest: {
RETURN_NOT_OK(ReadContainsRequest(input, input_size, &object_id));
- if (contains_object(object_id) == object_status::OBJECT_FOUND) {
+ if (ContainsObject(object_id) == ObjectStatus::OBJECT_FOUND) {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
} else {
HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
}
} break;
- case MessageType::PlasmaSealRequest: {
+ case fb::MessageType::PlasmaSealRequest: {
unsigned char digest[kDigestSize];
RETURN_NOT_OK(ReadSealRequest(input, input_size, &object_id, &digest[0]));
- seal_object(object_id, &digest[0]);
+ SealObject(object_id, &digest[0]);
} break;
- case MessageType::PlasmaEvictRequest: {
+ case fb::MessageType::PlasmaEvictRequest: {
// This code path should only be used for testing.
int64_t num_bytes;
RETURN_NOT_OK(ReadEvictRequest(input, input_size, &num_bytes));
std::vector<ObjectID> objects_to_evict;
int64_t num_bytes_evicted =
- eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict);
- delete_objects(objects_to_evict);
+ eviction_policy_.ChooseObjectsToEvict(num_bytes, &objects_to_evict);
+ DeleteObjects(objects_to_evict);
HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd);
} break;
- case MessageType::PlasmaSubscribeRequest:
- subscribe_to_updates(client);
+ case fb::MessageType::PlasmaSubscribeRequest:
+ SubscribeToUpdates(client);
break;
- case MessageType::PlasmaConnectRequest: {
+ case fb::MessageType::PlasmaConnectRequest: {
HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity),
client->fd);
} break;
- case MessageType::PlasmaDisconnectClient:
+ case fb::MessageType::PlasmaDisconnectClient:
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
- disconnect_client(client->fd);
+ DisconnectClient(client->fd);
break;
default:
// This code should be unreachable.
@@ -802,7 +804,7 @@ class PlasmaStoreRunner {
loop_.reset(new EventLoop);
store_.reset(
new PlasmaStore(loop_.get(), system_memory, directory, hugepages_enabled));
- plasma_config = store_->get_plasma_store_info();
+ plasma_config = store_->GetPlasmaStoreInfo();
// If the store is configured to use a single memory-mapped file, then we
// achieve that by mallocing and freeing a single large amount of space.
@@ -813,12 +815,12 @@ class PlasmaStoreRunner {
plasma::dlfree(pointer);
}
- int socket = bind_ipc_sock(socket_name, true);
+ int socket = BindIpcSock(socket_name, true);
// TODO(pcm): Check return value.
ARROW_CHECK(socket >= 0);
loop_->AddFileEvent(socket, kEventLoopRead, [this, socket](int events) {
- this->store_->connect_client(socket);
+ this->store_->ConnectClient(socket);
});
loop_->Start();
}
@@ -846,8 +848,8 @@ void HandleSignal(int signal) {
}
}
-void start_server(char* socket_name, int64_t system_memory, std::string plasma_directory,
- bool hugepages_enabled, bool use_one_memory_mapped_file) {
+void StartServer(char* socket_name, int64_t system_memory, std::string plasma_directory,
+ bool hugepages_enabled, bool use_one_memory_mapped_file) {
// Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
// to a client that has already died, the store could die.
signal(SIGPIPE, SIG_IGN);
@@ -938,13 +940,13 @@ int main(int argc, char* argv[]) {
"pass an argument with the flag '--shm-size' to 'docker run'.";
}
} else {
- set_malloc_granularity(1024 * 1024 * 1024); // 1 GB
+ SetMallocGranularity(1024 * 1024 * 1024); // 1 GB
}
#endif
// Make it so dlmalloc fails if we try to request more memory than is
// available.
plasma::dlmalloc_set_footprint_limit((size_t)system_memory);
ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
- plasma::start_server(socket_name, system_memory, plasma_directory, hugepages_enabled,
- use_one_memory_mapped_file);
+ plasma::StartServer(socket_name, system_memory, plasma_directory, hugepages_enabled,
+ use_one_memory_mapped_file);
}
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index e40f040..782e234 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -33,6 +33,9 @@
namespace plasma {
+using flatbuf::ObjectInfoT;
+using flatbuf::PlasmaError;
+
struct GetRequest;
struct NotificationQueue {
@@ -67,7 +70,7 @@ class PlasmaStore {
~PlasmaStore();
/// Get a const pointer to the internal PlasmaStoreInfo object.
- const PlasmaStoreInfo* get_plasma_store_info();
+ const PlasmaStoreInfo* GetPlasmaStoreInfo();
/// Create a new object. The client must do a call to release_object to tell
/// the store when it is done with the object.
@@ -90,9 +93,9 @@ class PlasmaStore {
/// - PlasmaError::OutOfMemory, if the store is out of memory and
/// cannot create the object. In this case, the client should not call
/// plasma_release.
- PlasmaError create_object(const ObjectID& object_id, int64_t data_size,
- int64_t metadata_size, int device_num, Client* client,
- PlasmaObject* result);
+ PlasmaError CreateObject(const ObjectID& object_id, int64_t data_size,
+ int64_t metadata_size, int device_num, Client* client,
+ PlasmaObject* result);
/// Abort a created but unsealed object. If the client is not the
/// creator, then the abort will fail.
@@ -101,7 +104,7 @@ class PlasmaStore {
/// @param client The client who created the object. If this does not
/// match the creator of the object, then the abort will fail.
/// @return 1 if the abort succeeds, else 0.
- int abort_object(const ObjectID& object_id, Client* client);
+ int AbortObject(const ObjectID& object_id, Client* client);
/// Delete an specific object by object_id that have been created in the hash table.
///
@@ -110,13 +113,13 @@ class PlasmaStore {
/// - PlasmaError::OK, if the object was delete successfully.
/// - PlasmaError::ObjectNonexistent, if ths object isn't existed.
/// - PlasmaError::ObjectInUse, if the object is in use.
- PlasmaError delete_object(ObjectID& object_id);
+ PlasmaError DeleteObject(ObjectID& object_id);
/// Delete objects that have been created in the hash table. This should only
/// be called on objects that are returned by the eviction policy to evict.
///
/// @param object_ids Object IDs of the objects to be deleted.
- void delete_objects(const std::vector<ObjectID>& object_ids);
+ void DeleteObjects(const std::vector<ObjectID>& object_ids);
/// Process a get request from a client. This method assumes that we will
/// eventually have these objects sealed. If one of the objects has not yet
@@ -129,8 +132,8 @@ class PlasmaStore {
/// @param client The client making this request.
/// @param object_ids Object IDs of the objects to be gotten.
/// @param timeout_ms The timeout for the get request in milliseconds.
- void process_get_request(Client* client, const std::vector<ObjectID>& object_ids,
- int64_t timeout_ms);
+ void ProcessGetRequest(Client* client, const std::vector<ObjectID>& object_ids,
+ int64_t timeout_ms);
/// Seal an object. The object is now immutable and can be accessed with get.
///
@@ -138,52 +141,52 @@ class PlasmaStore {
/// @param digest The digest of the object. This is used to tell if two
/// objects
/// with the same object ID are the same.
- void seal_object(const ObjectID& object_id, unsigned char digest[]);
+ void SealObject(const ObjectID& object_id, unsigned char digest[]);
/// Check if the plasma store contains an object:
///
/// @param object_id Object ID that will be checked.
/// @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
/// not
- object_status contains_object(const ObjectID& object_id);
+ ObjectStatus ContainsObject(const ObjectID& object_id);
/// Record the fact that a particular client is no longer using an object.
///
/// @param object_id The object ID of the object that is being released.
/// @param client The client making this request.
- void release_object(const ObjectID& object_id, Client* client);
+ void ReleaseObject(const ObjectID& object_id, Client* client);
/// Subscribe a file descriptor to updates about new sealed objects.
///
/// @param client The client making this request.
- void subscribe_to_updates(Client* client);
+ void SubscribeToUpdates(Client* client);
/// Connect a new client to the PlasmaStore.
///
/// @param listener_sock The socket that is listening to incoming connections.
- void connect_client(int listener_sock);
+ void ConnectClient(int listener_sock);
/// Disconnect a client from the PlasmaStore.
///
/// @param client_fd The client file descriptor that is disconnected.
- void disconnect_client(int client_fd);
+ void DisconnectClient(int client_fd);
- NotificationMap::iterator send_notifications(NotificationMap::iterator it);
+ NotificationMap::iterator SendNotifications(NotificationMap::iterator it);
- Status process_message(Client* client);
+ Status ProcessMessage(Client* client);
private:
- void push_notification(ObjectInfoT* object_notification);
+ void PushNotification(ObjectInfoT* object_notification);
- void push_notification(ObjectInfoT* object_notification, int client_fd);
+ void PushNotification(ObjectInfoT* object_notification, int client_fd);
- void add_to_client_object_ids(ObjectTableEntry* entry, Client* client);
+ void AddToClientObjectIds(ObjectTableEntry* entry, Client* client);
- void return_from_get(GetRequest* get_req);
+ void ReturnFromGet(GetRequest* get_req);
- void update_object_get_requests(const ObjectID& object_id);
+ void UpdateObjectGetRequests(const ObjectID& object_id);
- int remove_from_client_object_ids(ObjectTableEntry* entry, Client* client);
+ int RemoveFromClientObjectIds(ObjectTableEntry* entry, Client* client);
/// Event loop of the plasma store.
EventLoop* loop_;
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 1b445f2..97eb15e 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -25,6 +25,8 @@
#include "gtest/gtest.h"
+namespace fb = plasma::flatbuf;
+
namespace plasma {
/**
@@ -330,9 +332,9 @@ TEST(PlasmaSerialization, WaitRequest) {
const int num_objects_in = 2;
ObjectRequest object_requests_in[num_objects_in] = {
ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_ANYWHERE,
- ObjectStatus::Local}),
+ fb::ObjectStatus::Local}),
ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_LOCAL,
- ObjectStatus::Local})};
+ fb::ObjectStatus::Local})};
const int num_ready_objects_in = 1;
int64_t timeout_ms = 1000;
@@ -364,11 +366,11 @@ TEST(PlasmaSerialization, WaitReply) {
/* Create a map with two ObjectRequests in it. */
ObjectRequestMap objects_in(num_objects_in);
ObjectID id1 = ObjectID::from_random();
- objects_in[id1] =
- ObjectRequest({id1, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectStatus::Local});
+ objects_in[id1] = ObjectRequest(
+ {id1, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Local});
ObjectID id2 = ObjectID::from_random();
objects_in[id2] = ObjectRequest(
- {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectStatus::Nonexistent});
+ {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Nonexistent});
ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
/* Read message back. */