You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2018/07/19 04:42:38 UTC

[arrow] branch master updated: ARROW-2810: [Plasma] Remove flatbuffers from public API

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 35ef303  ARROW-2810: [Plasma] Remove flatbuffers from public API
35ef303 is described below

commit 35ef303ad4f5a1f7a7e156e94ef331b7f9586ca5
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Thu Jul 19 00:42:34 2018 -0400

    ARROW-2810: [Plasma] Remove flatbuffers from public API
    
    Author: Philipp Moritz <pc...@gmail.com>
    Author: Wes McKinney <we...@apache.org>
    
    Closes #2282 from pcmoritz/plasma-flatbuffer-cleanup and squashes the following commits:
    
    60c3ba00 <Philipp Moritz> make plasma.h non-public
    d6c14978 <Wes McKinney> Do not compile if flatbuffers/flatbuffers.h is included
    cb059e80 <Wes McKinney> Use updated API in JNI implementation
    3eb6c676 <Philipp Moritz> fix python wrappers
    a4125497 <Philipp Moritz> linting
    c3daac78 <Philipp Moritz> remove flatbuffers from public interface
---
 cpp/src/plasma/CMakeLists.txt                      |  4 ---
 cpp/src/plasma/client.cc                           |  2 +-
 cpp/src/plasma/common.cc                           | 16 -----------
 cpp/src/plasma/common.h                            | 26 ++++-------------
 .../org_apache_arrow_plasma_PlasmaClientJNI.cc     |  4 +--
 cpp/src/plasma/protocol.cc                         | 33 +++++++++++++++++++---
 cpp/src/plasma/test/serialization_tests.cc         | 12 ++++----
 python/pyarrow/_plasma.pyx                         | 11 ++++----
 8 files changed, 50 insertions(+), 58 deletions(-)

diff --git a/cpp/src/plasma/CMakeLists.txt b/cpp/src/plasma/CMakeLists.txt
index aa4d2b0..bb6e514 100644
--- a/cpp/src/plasma/CMakeLists.txt
+++ b/cpp/src/plasma/CMakeLists.txt
@@ -140,13 +140,9 @@ endif()
 # Headers: top level
 install(FILES
   common.h
-  "${OUTPUT_DIR}/common_generated.h"
   compat.h
   client.h
   events.h
-  plasma.h
-  "${OUTPUT_DIR}/plasma_generated.h"
-  protocol.h
   DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/plasma")
 
 # Plasma store
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index f2b0b97..e7d9d44 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -982,7 +982,7 @@ Status PlasmaClient::Impl::Wait(int64_t num_object_requests,
   *num_objects_ready = 0;
   for (int i = 0; i < num_object_requests; ++i) {
     ObjectRequestType type = object_requests[i].type;
-    fb::ObjectStatus status = object_requests[i].status;
+    auto status = static_cast<fb::ObjectStatus>(object_requests[i].location);
     switch (type) {
       case ObjectRequestType::PLASMA_QUERY_LOCAL:
         if (status == fb::ObjectStatus::Local) {
diff --git a/cpp/src/plasma/common.cc b/cpp/src/plasma/common.cc
index f91b963..6e368d3 100644
--- a/cpp/src/plasma/common.cc
+++ b/cpp/src/plasma/common.cc
@@ -125,22 +125,6 @@ bool UniqueID::operator==(const UniqueID& rhs) const {
   return std::memcmp(data(), rhs.data(), kUniqueIDSize) == 0;
 }
 
-Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
-  switch (plasma_error) {
-    case fb::PlasmaError::OK:
-      return Status::OK();
-    case fb::PlasmaError::ObjectExists:
-      return Status::PlasmaObjectExists("object already exists in the plasma store");
-    case fb::PlasmaError::ObjectNonexistent:
-      return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
-    case fb::PlasmaError::OutOfMemory:
-      return Status::PlasmaStoreFull("object does not fit in the plasma store");
-    default:
-      ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
-  }
-  return Status::OK();
-}
-
 ARROW_EXPORT fb::ObjectStatus ObjectStatusLocal = fb::ObjectStatus::Local;
 ARROW_EXPORT fb::ObjectStatus ObjectStatusRemote = fb::ObjectStatus::Remote;
 
diff --git a/cpp/src/plasma/common.h b/cpp/src/plasma/common.h
index 5e7c7d4..a41bf72 100644
--- a/cpp/src/plasma/common.h
+++ b/cpp/src/plasma/common.h
@@ -33,13 +33,7 @@
 
 namespace plasma {
 
-namespace flatbuf {
-
-// Forward declaration outside the namespace, which is defined in plasma_generated.h.
-enum class PlasmaError : int32_t;
-enum class ObjectStatus : int32_t;
-
-}  // namespace flatbuf
+enum class ObjectLocation : int32_t { Local, Remote, Nonexistent };
 
 constexpr int64_t kUniqueIDSize = 20;
 
@@ -62,8 +56,6 @@ static_assert(std::is_pod<UniqueID>::value, "UniqueID must be plain old data");
 
 typedef UniqueID ObjectID;
 
-arrow::Status PlasmaErrorStatus(flatbuf::PlasmaError plasma_error);
-
 /// Size of object hash digests.
 constexpr int64_t kDigestSize = sizeof(uint64_t);
 
@@ -84,19 +76,13 @@ struct ObjectRequest {
   ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
   ///    the system (i.e., either in the local or a remote Plasma Store).
   ObjectRequestType type;
-  /// Object status. Same as the status returned by plasma_status() function
-  /// call. This is filled in by plasma_wait_for_objects1():
-  ///  - ObjectStatus::Local: object is ready at the local Plasma Store.
-  ///  - ObjectStatus::Remote: object is ready at a remote Plasma Store.
-  ///  - ObjectStatus::Nonexistent: object does not exist in the system.
-  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
-  ///    for being transferred or it is transferring.
-  flatbuf::ObjectStatus status;
+  /// Object location. This can be
+  ///  - ObjectLocation::Local: object is ready at the local Plasma Store.
+  ///  - ObjectLocation::Remote: object is ready at a remote Plasma Store.
+  ///  - ObjectLocation::Nonexistent: object does not exist in the system.
+  ObjectLocation location;
 };
 
-extern flatbuf::ObjectStatus ObjectStatusLocal;
-extern flatbuf::ObjectStatus ObjectStatusRemote;
-
 /// Globally accessible reference to plasma store configuration.
 /// TODO(pcm): This can be avoided with some refactoring of existing code
 /// by making it possible to pass a context object through dlmalloc.
diff --git a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
index 270e696..4a3f6b2 100644
--- a/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
+++ b/cpp/src/plasma/lib/java/org_apache_arrow_plasma_PlasmaClientJNI.cc
@@ -271,8 +271,8 @@ JNIEXPORT jobjectArray JNICALL Java_org_apache_arrow_plasma_PlasmaClientJNI_wait
       break;
     }
 
-    if (oreqs[i].status == plasma::ObjectStatusLocal ||
-        oreqs[i].status == plasma::ObjectStatusRemote) {
+    if (oreqs[i].location == plasma::ObjectLocation::Local ||
+        oreqs[i].location == plasma::ObjectLocation::Remote) {
       oid = env->NewByteArray(OBJECT_ID_SIZE);
       object_id_to_jbyteArray(env, oid, &oreqs[i].object_id);
       env->SetObjectArrayElement(ret, num_returned, oid);
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index f5ea42a..5f761c4 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -37,6 +37,13 @@ using fb::PlasmaObjectSpec;
 
 using flatbuffers::uoffset_t;
 
+#define PLASMA_CHECK_ENUM(x, y) \
+  static_assert(static_cast<int>(x) == static_cast<int>(y), "protocol mismatch")
+
+PLASMA_CHECK_ENUM(ObjectLocation::Local, fb::ObjectStatus::Local);
+PLASMA_CHECK_ENUM(ObjectLocation::Remote, fb::ObjectStatus::Remote);
+PLASMA_CHECK_ENUM(ObjectLocation::Nonexistent, fb::ObjectStatus::Nonexistent);
+
 flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
 ToFlatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
              int64_t num_objects) {
@@ -75,6 +82,22 @@ Status PlasmaSend(int sock, MessageType message_type, flatbuffers::FlatBufferBui
   return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
 }
 
+Status PlasmaErrorStatus(fb::PlasmaError plasma_error) {
+  switch (plasma_error) {
+    case fb::PlasmaError::OK:
+      return Status::OK();
+    case fb::PlasmaError::ObjectExists:
+      return Status::PlasmaObjectExists("object already exists in the plasma store");
+    case fb::PlasmaError::ObjectNonexistent:
+      return Status::PlasmaObjectNonexistent("object does not exist in the plasma store");
+    case fb::PlasmaError::OutOfMemory:
+      return Status::PlasmaStoreFull("object does not fit in the plasma store");
+    default:
+      ARROW_LOG(FATAL) << "unknown plasma error code " << static_cast<int>(plasma_error);
+  }
+  return Status::OK();
+}
+
 // Create messages.
 
 Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
@@ -579,7 +602,7 @@ Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requ
     ObjectRequest object_request(
         {object_id,
          static_cast<ObjectRequestType>(message->object_requests()->Get(i)->type()),
-         fb::ObjectStatus::Nonexistent});
+         ObjectLocation::Nonexistent});
     object_requests[object_id] = object_request;
   }
   return Status::OK();
@@ -592,8 +615,9 @@ Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
   std::vector<flatbuffers::Offset<fb::ObjectReply>> object_replies;
   for (const auto& entry : object_requests) {
     const auto& object_request = entry.second;
-    object_replies.push_back(fb::CreateObjectReply(
-        fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
+    object_replies.push_back(
+        fb::CreateObjectReply(fbb, fbb.CreateString(object_request.object_id.binary()),
+                              static_cast<fb::ObjectStatus>(object_request.location)));
   }
 
   auto message = fb::CreatePlasmaWaitReply(
@@ -611,7 +635,8 @@ Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[]
   for (int i = 0; i < *num_ready_objects; i++) {
     object_requests[i].object_id =
         ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
-    object_requests[i].status = message->object_requests()->Get(i)->status();
+    object_requests[i].location =
+        static_cast<ObjectLocation>(message->object_requests()->Get(i)->status());
   }
   return Status::OK();
 }
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
index 97eb15e..15df7de 100644
--- a/cpp/src/plasma/test/serialization_tests.cc
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -332,9 +332,9 @@ TEST(PlasmaSerialization, WaitRequest) {
   const int num_objects_in = 2;
   ObjectRequest object_requests_in[num_objects_in] = {
       ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_ANYWHERE,
-                     fb::ObjectStatus::Local}),
+                     ObjectLocation::Local}),
       ObjectRequest({ObjectID::from_random(), ObjectRequestType::PLASMA_QUERY_LOCAL,
-                     fb::ObjectStatus::Local})};
+                     ObjectLocation::Local})};
   const int num_ready_objects_in = 1;
   int64_t timeout_ms = 1000;
 
@@ -366,11 +366,11 @@ TEST(PlasmaSerialization, WaitReply) {
   /* Create a map with two ObjectRequests in it. */
   ObjectRequestMap objects_in(num_objects_in);
   ObjectID id1 = ObjectID::from_random();
-  objects_in[id1] = ObjectRequest(
-      {id1, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Local});
+  objects_in[id1] =
+      ObjectRequest({id1, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Local});
   ObjectID id2 = ObjectID::from_random();
   objects_in[id2] = ObjectRequest(
-      {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, fb::ObjectStatus::Nonexistent});
+      {id2, ObjectRequestType::PLASMA_QUERY_LOCAL, ObjectLocation::Nonexistent});
 
   ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
   /* Read message back. */
@@ -386,7 +386,7 @@ TEST(PlasmaSerialization, WaitReply) {
     const auto& entry = objects_in.find(objects_out[i].object_id);
     ASSERT_TRUE(entry != objects_in.end());
     ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
-    ASSERT_EQ(entry->second.status, objects_out[i].status);
+    ASSERT_EQ(entry->second.location, objects_out[i].location);
   }
   close(fd);
 }
diff --git a/python/pyarrow/_plasma.pyx b/python/pyarrow/_plasma.pyx
index 1e2cb66..ba52822 100644
--- a/python/pyarrow/_plasma.pyx
+++ b/python/pyarrow/_plasma.pyx
@@ -56,7 +56,7 @@ cdef extern from "plasma/common.h" nogil:
     cdef struct CObjectRequest" plasma::ObjectRequest":
         CUniqueID object_id
         int type
-        int status
+        int location
 
 
 cdef extern from "plasma/common.h":
@@ -66,8 +66,9 @@ cdef extern from "plasma/common.h":
         PLASMA_QUERY_LOCAL"plasma::ObjectRequestType::PLASMA_QUERY_LOCAL",
         PLASMA_QUERY_ANYWHERE"plasma::ObjectRequestType::PLASMA_QUERY_ANYWHERE"
 
-    cdef int ObjectStatusLocal"plasma::ObjectStatusLocal"
-    cdef int ObjectStatusRemote"plasma::ObjectStatusRemote"
+    cdef enum ObjectLocation:
+        ObjectStatusLocal"plasma::ObjectLocation::Local"
+        ObjectStatusRemote"plasma::ObjectLocation::Remote"
 
 cdef extern from "plasma/client.h" nogil:
 
@@ -598,8 +599,8 @@ cdef class PlasmaClient:
         for i in range(len(object_ids)):
             if num_returned == num_to_return:
                 break
-            if (object_requests[i].status == ObjectStatusLocal or
-                    object_requests[i].status == ObjectStatusRemote):
+            if (object_requests[i].location == ObjectStatusLocal or
+                    object_requests[i].location == ObjectStatusRemote):
                 ready_ids.append(
                     ObjectID(object_requests[i].object_id.binary()))
                 waiting_ids.discard(