You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by pc...@apache.org on 2017/11/08 22:13:32 UTC

[arrow] branch master updated: ARROW-1775: Ability to abort created but unsealed Plasma objects

This is an automated email from the ASF dual-hosted git repository.

pcmoritz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 78872a1  ARROW-1775: Ability to abort created but unsealed Plasma objects
78872a1 is described below

commit 78872a1be263e61d7901eb36663a184c2b04effb
Author: Stephanie <sw...@cs.berkeley.edu>
AuthorDate: Wed Nov 8 14:13:28 2017 -0800

    ARROW-1775: Ability to abort created but unsealed Plasma objects
    
    Author: Stephanie <sw...@cs.berkeley.edu>
    Author: Philipp Moritz <pc...@gmail.com>
    
    Closes #1289 from stephanie-wang/abort-objects and squashes the following commits:
    
    38c42b9 [Stephanie] TODO for PascalCase
    08d4040 [Stephanie] Move documentation
    dd5b29e [Stephanie] Fix memory error
    e6934ac [Philipp Moritz] fix linting
    2b8e385 [Stephanie] Return status code when unmapping object
    fe20b3b [Stephanie] Add test case for PlasmaClient::Abort
    646190c [Stephanie] Abort objects that were not sealed when client disconnects
    5fc44c5 [Stephanie] Implement PlasmaClient::Abort
---
 cpp/src/plasma/client.cc            | 97 +++++++++++++++++++++++++++++--------
 cpp/src/plasma/client.h             | 19 ++++++++
 cpp/src/plasma/format/plasma.fbs    | 12 +++++
 cpp/src/plasma/protocol.cc          | 28 +++++++++++
 cpp/src/plasma/protocol.h           |  8 +++
 cpp/src/plasma/store.cc             | 23 ++++++++-
 cpp/src/plasma/store.h              |  3 ++
 cpp/src/plasma/test/client_tests.cc | 44 +++++++++++++++++
 8 files changed, 213 insertions(+), 21 deletions(-)

diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index e57a2a6..dd32bdc 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -278,6 +278,39 @@ Status PlasmaClient::Get(const ObjectID* object_ids, int64_t num_objects,
   return Status::OK();
 }
 
+Status PlasmaClient::UnmapObject(const ObjectID& object_id) {
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end());
+  ARROW_CHECK(object_entry->second->count == 0);
+
+  // Decrement the count of the number of objects in this memory-mapped file
+  // that the client is using. The corresponding increment should have
+  // happened in plasma_get.
+  int fd = object_entry->second->object.handle.store_fd;
+  auto entry = mmap_table_.find(fd);
+  ARROW_CHECK(entry != mmap_table_.end());
+  ARROW_CHECK(entry->second.count >= 1);
+  if (entry->second.count == 1) {
+    // If no other objects are being used, then unmap the file.
+    int err = munmap(entry->second.pointer, entry->second.length);
+    if (err == -1) {
+      return Status::IOError("Error during munmap");
+    }
+    // Remove the corresponding entry from the hash table.
+    mmap_table_.erase(fd);
+  } else {
+    // If there are other objects being used, decrement the reference count.
+    entry->second.count -= 1;
+  }
+  // Update the in_use_object_bytes_.
+  in_use_object_bytes_ -= (object_entry->second->object.data_size +
+                           object_entry->second->object.metadata_size);
+  DCHECK_GE(in_use_object_bytes_, 0);
+  // Remove the entry from the hash table of objects currently in use.
+  objects_in_use_.erase(object_id);
+  return Status::OK();
+}
+
 /// This is a helper method for implementing plasma_release. We maintain a
 /// buffer
 /// of release calls and only perform them once the buffer becomes full (as
@@ -297,28 +330,9 @@ Status PlasmaClient::PerformRelease(const ObjectID& object_id) {
   ARROW_CHECK(object_entry->second->count >= 0);
   // Check if the client is no longer using this object.
   if (object_entry->second->count == 0) {
-    // Decrement the count of the number of objects in this memory-mapped file
-    // that the client is using. The corresponding increment should have
-    // happened in plasma_get.
-    int fd = object_entry->second->object.handle.store_fd;
-    auto entry = mmap_table_.find(fd);
-    ARROW_CHECK(entry != mmap_table_.end());
-    entry->second.count -= 1;
-    ARROW_CHECK(entry->second.count >= 0);
-    // If none are being used then unmap the file.
-    if (entry->second.count == 0) {
-      munmap(entry->second.pointer, entry->second.length);
-      // Remove the corresponding entry from the hash table.
-      mmap_table_.erase(fd);
-    }
     // Tell the store that the client no longer needs the object.
+    RETURN_NOT_OK(UnmapObject(object_id));
     RETURN_NOT_OK(SendReleaseRequest(store_conn_, object_id));
-    // Update the in_use_object_bytes_.
-    in_use_object_bytes_ -= (object_entry->second->object.data_size +
-                             object_entry->second->object.metadata_size);
-    DCHECK_GE(in_use_object_bytes_, 0);
-    // Remove the entry from the hash table of objects currently in use.
-    objects_in_use_.erase(object_id);
   }
   return Status::OK();
 }
@@ -344,6 +358,20 @@ Status PlasmaClient::Release(const ObjectID& object_id) {
   return Status::OK();
 }
 
+Status PlasmaClient::FlushReleaseHistory() {
+  // If the client is already disconnected, ignore the flush.
+  if (store_conn_ < 0) {
+    return Status::OK();
+  }
+  while (release_history_.size() > 0) {
+    // Perform a release for the object ID for the first pending release.
+    RETURN_NOT_OK(PerformRelease(release_history_.back()));
+    // Remove the last entry from the release history.
+    release_history_.pop_back();
+  }
+  return Status::OK();
+}
+
 // This method is used to query whether the plasma store contains an object.
 Status PlasmaClient::Contains(const ObjectID& object_id, bool* has_object) {
   // Check if we already have a reference to the object.
@@ -443,6 +471,35 @@ Status PlasmaClient::Seal(const ObjectID& object_id) {
   return Release(object_id);
 }
 
+Status PlasmaClient::Abort(const ObjectID& object_id) {
+  auto object_entry = objects_in_use_.find(object_id);
+  ARROW_CHECK(object_entry != objects_in_use_.end())
+      << "Plasma client called abort on an object without a reference to it";
+  ARROW_CHECK(!object_entry->second->is_sealed)
+      << "Plasma client called abort on a sealed object";
+
+  // Flush the release history.
+  RETURN_NOT_OK(FlushReleaseHistory());
+  // Make sure that the Plasma client only has one reference to the object. If
+  // it has more, then the client needs to release the buffer before calling
+  // abort.
+  if (object_entry->second->count > 1) {
+    return Status::Invalid("Plasma client cannot have a reference to the buffer.");
+  }
+
+  // Send the abort request.
+  RETURN_NOT_OK(SendAbortRequest(store_conn_, object_id));
+  // Decrease the reference count to zero, then remove the object.
+  object_entry->second->count--;
+  RETURN_NOT_OK(UnmapObject(object_id));
+
+  std::vector<uint8_t> buffer;
+  ObjectID id;
+  int64_t type;
+  RETURN_NOT_OK(ReadMessage(store_conn_, &type, &buffer));
+  return ReadAbortReply(buffer.data(), buffer.size(), &id);
+}
+
 Status PlasmaClient::Delete(const ObjectID& object_id) {
   // TODO(rkn): In the future, we can use this method to give hints to the
   // eviction policy about when an object will no longer be needed.
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 1459424..89df2b0 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -152,6 +152,15 @@ class ARROW_EXPORT PlasmaClient {
   /// \return The return status.
   Status Contains(const ObjectID& object_id, bool* has_object);
 
+  /// Abort an unsealed object in the object store. If the abort succeeds, then
+  /// it will be as if the object was never created at all. The unsealed object
+  /// must have only a single reference (the one that would have been removed by
+  /// calling Seal).
+  ///
+  /// \param object_id The ID of the object to abort.
+  /// \return The return status.
+  Status Abort(const ObjectID& object_id);
+
   /// Seal an object in the object store. The object will be immutable after
   /// this
   /// call.
@@ -307,6 +316,16 @@ class ARROW_EXPORT PlasmaClient {
   int get_manager_fd();
 
  private:
+  /// This is a helper method for unmapping objects for which all references have
+  /// gone out of scope, either by calling Release or Abort.
+  ///
+  /// @param object_id The object ID whose data we should unmap.
+  Status UnmapObject(const ObjectID& object_id);
+
+  /// This is a helper method that flushes all pending release calls to the
+  /// store.
+  Status FlushReleaseHistory();
+
   Status PerformRelease(const ObjectID& object_id);
 
   uint8_t* lookup_or_mmap(int fd, int store_fd_val, int64_t map_size);
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 23782ad..b6d03b8 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -21,6 +21,8 @@ enum MessageType:int {
   // Create a new object.
   PlasmaCreateRequest = 1,
   PlasmaCreateReply,
+  PlasmaAbortRequest,
+  PlasmaAbortReply,
   // Seal an object.
   PlasmaSealRequest,
   PlasmaSealReply,
@@ -113,6 +115,16 @@ table PlasmaCreateReply {
   error: PlasmaError;
 }
 
+table PlasmaAbortRequest {
+  // ID of the object to be aborted.
+  object_id: string;
+}
+
+table PlasmaAbortReply {
+  // ID of the object that was aborted.
+  object_id: string;
+}
+
 table PlasmaSealRequest {
   // ID of the object to be sealed.
   object_id: string;
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
index 2261b6a..c0ebb88 100644
--- a/cpp/src/plasma/protocol.cc
+++ b/cpp/src/plasma/protocol.cc
@@ -100,6 +100,34 @@ Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
   return plasma_error_status(message->error());
 }
 
+Status SendAbortRequest(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaAbortRequest(fbb, fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaAbortRequest, &fbb, message);
+}
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaAbortRequest>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
+Status SendAbortReply(int sock, ObjectID object_id) {
+  flatbuffers::FlatBufferBuilder fbb;
+  auto message = CreatePlasmaAbortReply(fbb, fbb.CreateString(object_id.binary()));
+  return PlasmaSend(sock, MessageType_PlasmaAbortReply, &fbb, message);
+}
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id) {
+  DCHECK(data);
+  auto message = flatbuffers::GetRoot<PlasmaAbortReply>(data);
+  DCHECK(verify_flatbuffer(message, data, size));
+  *object_id = ObjectID::from_binary(message->object_id()->str());
+  return Status::OK();
+}
+
 // Seal messages.
 
 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
index af4b139..e8c334f 100644
--- a/cpp/src/plasma/protocol.h
+++ b/cpp/src/plasma/protocol.h
@@ -51,6 +51,14 @@ Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int e
 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
                        PlasmaObject* object);
 
+Status SendAbortRequest(int sock, ObjectID object_id);
+
+Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
+
+Status SendAbortReply(int sock, ObjectID object_id);
+
+Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
+
 /* Plasma Seal message functions. */
 
 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 210cce1..5dbdebc 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -393,6 +393,18 @@ void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[])
   update_object_get_requests(object_id);
 }
 
+void PlasmaStore::abort_object(const ObjectID& object_id) {
+  auto entry = get_object_table_entry(&store_info_, object_id);
+  ARROW_CHECK(entry != NULL) << "To abort an object it must be in the object table.";
+  ARROW_CHECK(entry->state != PLASMA_SEALED)
+      << "To abort an object it must not have been sealed.";
+  ARROW_CHECK(entry->clients.size() == 1)
+      << "To abort an object, the only client currently using it is the creator.";
+
+  dlfree(entry->pointer);
+  store_info_.objects.erase(object_id);
+}
+
 void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
   for (const auto& object_id : object_ids) {
     ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
@@ -443,7 +455,11 @@ void PlasmaStore::disconnect_client(int client_fd) {
   // If this client was using any objects, remove it from the appropriate
   // lists.
   for (const auto& entry : store_info_.objects) {
-    remove_client_from_object_clients(entry.second.get(), it->second.get());
+    if (entry.second->state == PLASMA_SEALED) {
+      remove_client_from_object_clients(entry.second.get(), it->second.get());
+    } else {
+      abort_object(entry.first);
+    }
   }
 
   // Note, the store may still attempt to send a message to the disconnected
@@ -582,6 +598,11 @@ Status PlasmaStore::process_message(Client* client) {
         warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
       }
     } break;
+    case MessageType_PlasmaAbortRequest: {
+      RETURN_NOT_OK(ReadAbortRequest(input, input_size, &object_id));
+      abort_object(object_id);
+      HANDLE_SIGPIPE(SendAbortReply(client->fd, object_id), client->fd);
+    } break;
     case MessageType_PlasmaGetRequest: {
       std::vector<ObjectID> object_ids_to_get;
       int64_t timeout_ms;
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
index d03d11f..0d08d8a 100644
--- a/cpp/src/plasma/store.h
+++ b/cpp/src/plasma/store.h
@@ -48,6 +48,7 @@ struct Client {
 
 class PlasmaStore {
  public:
+  // TODO: PascalCase PlasmaStore methods.
   PlasmaStore(EventLoop* loop, int64_t system_memory, std::string directory,
               bool hugetlbfs_enabled);
 
@@ -73,6 +74,8 @@ class PlasmaStore {
   int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
                     Client* client, PlasmaObject* result);
 
+  void abort_object(const ObjectID& object_id);
+
   /// Delete objects that have been created in the hash table. This should only
   /// be called on objects that are returned by the eviction policy to evict.
   ///
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index 0f19da5..5c0cee4 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -127,6 +127,50 @@ TEST_F(TestPlasmaStore, MultipleGetTest) {
   ASSERT_EQ(object_buffer[1].data[0], 2);
 }
 
+TEST_F(TestPlasmaStore, AbortTest) {
+  ObjectID object_id = ObjectID::from_random();
+  ObjectBuffer object_buffer;
+
+  // Test for object non-existence.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+  ASSERT_EQ(object_buffer.data_size, -1);
+
+  // Test object abort.
+  // First create object.
+  int64_t data_size = 4;
+  uint8_t metadata[] = {5};
+  int64_t metadata_size = sizeof(metadata);
+  uint8_t* data;
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+  // Write some data.
+  for (int64_t i = 0; i < data_size / 2; i++) {
+    data[i] = static_cast<uint8_t>(i % 4);
+  }
+  // Attempt to abort. Test that this fails before the first release.
+  Status status = client_.Abort(object_id);
+  ASSERT_TRUE(status.IsInvalid());
+  // Release, then abort.
+  ARROW_CHECK_OK(client_.Release(object_id));
+  ARROW_CHECK_OK(client_.Abort(object_id));
+
+  // Test for object non-existence after the abort.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+  ASSERT_EQ(object_buffer.data_size, -1);
+
+  // Create the object successfully this time.
+  ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+  for (int64_t i = 0; i < data_size; i++) {
+    data[i] = static_cast<uint8_t>(i % 4);
+  }
+  ARROW_CHECK_OK(client_.Seal(object_id));
+
+  // Test that we can get the object.
+  ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+  for (int64_t i = 0; i < data_size; i++) {
+    ASSERT_EQ(data[i], object_buffer.data[i]);
+  }
+}
+
 }  // namespace plasma
 
 int main(int argc, char** argv) {

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <co...@arrow.apache.org>'].