You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/07/19 04:42:38 UTC
[arrow] branch master updated: ARROW-2810: [Plasma] Remove
flatbuffers from public API
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 35ef303 ARROW-2810: [Plasma] Remove flatbuffers from public API
35ef303 is described below
commit 35ef303ad4f5a1f7a7e156e94ef331b7f9586ca5
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Jul 19 00:42:34 2018 -0400
ARROW-2810: [Plasma] Remove flatbuffers from public API
Author: Philipp Moritz <pc...@gmail.com>
Author: Wes McKinney <we...@apache.org>
Closes #2282 from pcmoritz/plasma-flatbuffer-cleanup and squashes the following commits:
60c3ba00 <Philipp Moritz> make plasma.h non-public
d6c14978 <Wes McKinney> Do not compile if flatbuffers/flatbuffers.h is included
cb059e80 <Wes McKinney> Use updated API in JNI implementation
3eb6c676 <Philipp Moritz> fix python wrappers
a4125497 <Philipp Moritz> linting
c3daac78 <Philipp Moritz> remove flatbuffers from public interface
---
cpp/src/plasma/CMakeLists.txt | 4 ---
cpp/src/plasma/client.cc | 2 +-
cpp/src/plasma/common.cc | 16 -----------
cpp/src/plasma/common.h | 26 ++++-------------
.../org_apache_arrow_plasma_PlasmaClientJNI.cc | 4 +--
cpp/src/plasma/protocol.cc | 33 +++++++++++++++++++---
cpp/src/plasma/test/serialization_tests.cc | 12 ++++----
python/pyarrow/_plasma.pyx | 11 ++++----
8 files changed, 50 insertions(+), 58 deletions(-)
diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index aa4d2b0..bb6e514 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -140,13 +140,9 @@ endif()
# Headers: top level
install(FILES
common.h
- "${OUTPUT_DIR}/common_generated.h"
compat.h
client.h
events.h
- plasma.h
- "${OUTPUT_DIR}/plasma_generated.h"
- protocol.h
DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma")
# Plasma store
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index f2b0b97..e7d9d44 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -982,7 +982,7 @@ Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
*num_objects_ready = 0;
for (int i = 0; i < num_object_requests; ++i) {
ObjectRequestType type = object_requests[i].type;
- fb::ObjectStatus status = object_requests[i].status;
+ auto status = static_cast<fb::ObjectStatus>(object_requests[i].location);
switch (type) {
case ObjectRequestType::PLASMA_QUERY_LOCAL:
if (status == fb::ObjectStatus::Local) {
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index f91b963..6e368d3 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -125,22 +125,6 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
}
-Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
- switch (plasma_error) {
- case fb::PlasmaError::OK:
- return Status::OK();
- case fb::PlasmaError::ObjectExists:
- return Status::PlasmaObjectExists("object already exists in the plasma store");
- case fb::PlasmaError::ObjectNonexistent:
- return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
- case fb::PlasmaError::OutOfMemory:
- return Status::PlasmaStoreFull("object does not fit in the plasma store");
- default:
- ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
- }
- return Status::OK();
-}
-
ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 5e7c7d4..a41bf72 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -33,13 +33,7 @@
namespace plasma {
-namespace flatbuf {
-
-// Forward declaration outside the namespace, which is defined in plasma_generated.h.
-enum class PlasmaError : int32_t;
-enum class ObjectStatus : int32_t;
-
-} // namespace flatbuf
+enum class ObjectLocation : int32_t { Local, Remote, Nonexistent };
constexpr int64_t kUniqueIDSize = 20;
@@ -62,8 +56,6 @@ static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
typedef UniqueID ObjectID;
-arrow::Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error);
-
/// Size of object hash digests.
constexpr int64_t kDigestSize = sizeof(uint64_t);
@@ -84,19 +76,13 @@ struct ObjectRequest {
/// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
/// the system (i.e., either in the local or a remote Plasma Store).
ObjectRequestType type;
- /// Object status. Same as the status returned by plasma_status() function
- /// call. This is filled in by plasma_wait_for_objects1():
- /// - ObjectStatus::Local: object is ready at the local Plasma Store.
- /// - ObjectStatus::Remote: object is ready at a remote Plasma Store.
- /// - ObjectStatus::Nonexistent: object does not exist in the system.
- /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
- /// for being transferred or it is transferring.
- flatbuf::ObjectStatus status;
+ /// Object location. This can be
+ /// - ObjectLocation::Local: object is ready at the local Plasma Store.
+ /// - ObjectLocation::Remote: object is ready at a remote Plasma Store.
+ /// - ObjectLocation::Nonexistent: object does not exist in the system.
+ ObjectLocation location;
};
-extern flatbuf::ObjectStatus ObjectStatusLocal;
-extern flatbuf::ObjectStatus ObjectStatusRemote;
-
/// Globally accessible reference to plasma store configuration.
/// TODO(pcm): This can be avoided with some refactoring of existing code
/// by making it possible to pass a context object through dlmalloc.
diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
index 270e696..4a3f6b2 100644
--- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
+++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
@@ -271,8 +271,8 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait
break;
}
- if (oreqs[i].status == plasma::ObjectStatusLocal ||
- oreqs[i].status == plasma::ObjectStatusRemote) {
+ if (oreqs[i].location == plasma::ObjectLocation::Local ||
+ oreqs[i].location == plasma::ObjectLocation::Remote) {
oid = env->NewByteArray(OBJECT_ID_SIZE);
object_id_to_jbyteArray(env, oid, &oreqs[i].object_id);
env->SetObjectArrayElement(ret, num_returned, oid);
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index f5ea42a..5f761c4 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -37,6 +37,13 @@ using fb::PlasmaObjectSpec;
using flatbuffers::uoffset_t;
+#define PLASMA_CHECK_ENUM(x, y) \
+ static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
+
+PLASMA_CHECK_ENUM(ObjectLocation::Local, fb::ObjectStatus::Local);
+PLASMA_CHECK_ENUM(ObjectLocation::Remote, fb::ObjectStatus::Remote);
+PLASMA_CHECK_ENUM(ObjectLocation::Nonexistent, fb::ObjectStatus::Nonexistent);
+
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
int64_t num_objects) {
@@ -75,6 +82,22 @@ Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBui
return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
}
+Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
+ switch (plasma_error) {
+ case fb::PlasmaError::OK:
+ return Status::OK();
+ case fb::PlasmaError::ObjectExists:
+ return Status::PlasmaObjectExists("object already exists in the plasma store");
+ case fb::PlasmaError::ObjectNonexistent:
+ return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
+ case fb::PlasmaError::OutOfMemory:
+ return Status::PlasmaStoreFull("object does not fit in the plasma store");
+ default:
+ ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
+ }
+ return Status::OK();
+}
+
// Create messages.
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
@@ -579,7 +602,7 @@ Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requ
ObjectRequest object_request(
{object_id,
static_cast<ObjectRequestType>(message->object_requests()->Get(i)->type()),
- fb::ObjectStatus::Nonexistent});
+ ObjectLocation::Nonexistent});
object_requests[object_id] = object_request;
}
return Status::OK();
@@ -592,8 +615,9 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
std::vector<flatbuffers::Offset<fb::ObjectReply>> object_replies;
for (const auto& entry : object_requests) {
const auto& object_request = entry.second;
- object_replies.push_back(fb::CreateObjectReply(
- fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
+ object_replies.push_back(
+ fb::CreateObjectReply(fbb, fbb.CreateString(object_request.object_id.binary()),
+ static_cast<fb::ObjectStatus>(object_request.location)));
}
auto message = fb::CreatePlasmaWaitReply(
@@ -611,7 +635,8 @@ Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[]
for (int i = 0; i < *num_ready_objects; i++) {
object_requests[i].object_id =
ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
- object_requests[i].status = message->object_requests()->Get(i)->status();
+ object_requests[i].location =
+ static_cast<ObjectLocation>(message->object_requests()->Get(i)->status());
}
return Status::OK();
}
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 97eb15e..15df7de 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -332,9 +332,9 @@ TEST(PlasmaSerialization, WaitRequest) {
const int num_objects_in = 2;
ObjectRequest object_requests_in[num_objects_in] = {
ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_ANYWHERE,
- fb::ObjectStatus::Local}),
+ ObjectLocation::Local}),
ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_LOCAL,
- fb::ObjectStatus::Local})};
+ ObjectLocation::Local})};
const int num_ready_objects_in = 1;
int64_t timeout_ms = 1000;
@@ -366,11 +366,11 @@ TEST(PlasmaSerialization, WaitReply) {
/* Create a map with two ObjectRequests in it. */
ObjectRequestMap objects_in(num_objects_in);
ObjectID id1 = ObjectID::from_random();
- objects_in[id1] = ObjectRequest(
- {id1, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Local});
+ objects_in[id1] =
+ ObjectRequest({id1, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Local});
ObjectID id2 = ObjectID::from_random();
objects_in[id2] = ObjectRequest(
- {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Nonexistent});
+ {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Nonexistent});
ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
/* Read message back. */
@@ -386,7 +386,7 @@ TEST(PlasmaSerialization, WaitReply) {
const auto& entry = objects_in.find(objects_out[i].object_id);
ASSERT_TRUE(entry != objects_in.end());
ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
- ASSERT_EQ(entry->second.status, objects_out[i].status);
+ ASSERT_EQ(entry->second.location, objects_out[i].location);
}
close(fd);
}
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 1e2cb66..ba52822 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -56,7 +56,7 @@ cdef extern from "plasma/common.h" nogil:
cdef struct CObjectRequest" plasma::ObjectRequest":
CUniqueID object_id
int type
- int status
+ int location
cdef extern from "plasma/common.h":
@@ -66,8 +66,9 @@ cdef extern from "plasma/common.h":
PLASMA_QUERY_LOCAL"plasma::ObjectRequestType::PLASMA_QUERY_LOCAL",
PLASMA_QUERY_ANYWHERE"plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE"
- cdef int ObjectStatusLocal"plasma::ObjectStatusLocal"
- cdef int ObjectStatusRemote"plasma::ObjectStatusRemote"
+ cdef enum ObjectLocation:
+ ObjectStatusLocal"plasma::ObjectLocation::Local"
+ ObjectStatusRemote"plasma::ObjectLocation::Remote"
cdef extern from "plasma/client.h" nogil:
@@ -598,8 +599,8 @@ cdef class PlasmaClient:
for i in range(len(object_ids)):
if num_returned == num_to_return:
break
- if (object_requests[i].status == ObjectStatusLocal or
- object_requests[i].status == ObjectStatusRemote):
+ if (object_requests[i].location == ObjectStatusLocal or
+ object_requests[i].location == ObjectStatusRemote):
ready_ids.append(
ObjectID(object_requests[i].object_id.binary()))
waiting_ids.discard(