You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/23 18:39:12 UTC
[05/14] arrow git commit: [C++] Remove Plasma source tree for 0.5.0
release pending IP Clearance
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/fling.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc
deleted file mode 100644
index 79da4f4..0000000
--- a/cpp/src/plasma/fling.cc
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2013 Sharvil Nanavati
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "plasma/fling.h"
-
-#include <string.h>
-
-void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
- iov->iov_base = buf;
- iov->iov_len = 1;
-
- msg->msg_iov = iov;
- msg->msg_iovlen = 1;
- msg->msg_control = buf;
- msg->msg_controllen = buf_len;
- msg->msg_name = NULL;
- msg->msg_namelen = 0;
-}
-
-int send_fd(int conn, int fd) {
- struct msghdr msg;
- struct iovec iov;
- char buf[CMSG_SPACE(sizeof(int))];
- memset(&buf, 0, CMSG_SPACE(sizeof(int)));
-
- init_msg(&msg, &iov, buf, sizeof(buf));
-
- struct cmsghdr* header = CMSG_FIRSTHDR(&msg);
- header->cmsg_level = SOL_SOCKET;
- header->cmsg_type = SCM_RIGHTS;
- header->cmsg_len = CMSG_LEN(sizeof(int));
- *reinterpret_cast<int*>(CMSG_DATA(header)) = fd;
-
- // Send file descriptor.
- ssize_t r = sendmsg(conn, &msg, 0);
- if (r >= 0) {
- return 0;
- } else {
- return static_cast<int>(r);
- }
-}
-
-int recv_fd(int conn) {
- struct msghdr msg;
- struct iovec iov;
- char buf[CMSG_SPACE(sizeof(int))];
- init_msg(&msg, &iov, buf, sizeof(buf));
-
- if (recvmsg(conn, &msg, 0) == -1) return -1;
-
- int found_fd = -1;
- int oh_noes = 0;
- for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
- header = CMSG_NXTHDR(&msg, header))
- if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
- ssize_t count =
- (header->cmsg_len - (CMSG_DATA(header) - (unsigned char*)header)) / sizeof(int);
- for (int i = 0; i < count; ++i) {
- int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
- if (found_fd == -1) {
- found_fd = fd;
- } else {
- close(fd);
- oh_noes = 1;
- }
- }
- }
-
- // The sender sent us more than one file descriptor. We've closed
- // them all to prevent fd leaks but notify the caller that we got
- // a bad message.
- if (oh_noes) {
- close(found_fd);
- errno = EBADMSG;
- return -1;
- }
-
- return found_fd;
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/fling.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.h b/cpp/src/plasma/fling.h
deleted file mode 100644
index 78ac9d1..0000000
--- a/cpp/src/plasma/fling.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2013 Sharvil Nanavati
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// FLING: Exchanging file descriptors over sockets
-//
-// This is a little library for sending file descriptors over a socket
-// between processes. The reason for doing that (as opposed to using
-// filenames to share the files) is so (a) no files remain in the
-// filesystem after all the processes terminate, (b) to make sure that
-// there are no name collisions and (c) to be able to control who has
-// access to the data.
-//
-// Most of the code is from https://github.com/sharvil/flingfd
-
-#include <errno.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-// This is neccessary for Mac OS X, see http://www.apuebook.com/faqs2e.html
-// (10).
-#if !defined(CMSG_SPACE) && !defined(CMSG_LEN)
-#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len))
-#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len))
-#endif
-
-void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len);
-
-// Send a file descriptor over a unix domain socket.
-//
-// @param conn Unix domain socket to send the file descriptor over.
-// @param fd File descriptor to send over.
-// @return Status code which is < 0 on failure.
-int send_fd(int conn, int fd);
-
-// Receive a file descriptor over a unix domain socket.
-//
-// @param conn Unix domain socket to receive the file descriptor from.
-// @return File descriptor or a value < 0 on failure.
-int recv_fd(int conn);
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/.gitignore
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/.gitignore b/cpp/src/plasma/format/.gitignore
deleted file mode 100644
index b2ddb05..0000000
--- a/cpp/src/plasma/format/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-*_generated.h
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/common.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
deleted file mode 100644
index 4d7d285..0000000
--- a/cpp/src/plasma/format/common.fbs
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Object information data structure.
-table ObjectInfo {
- // Object ID of this object.
- object_id: string;
- // Number of bytes the content of this object occupies in memory.
- data_size: long;
- // Number of bytes the metadata of this object occupies in memory.
- metadata_size: long;
- // Unix epoch of when this object was created.
- create_time: long;
- // How long creation of this object took.
- construct_duration: long;
- // Hash of the object content.
- digest: string;
- // Specifies if this object was deleted or added.
- is_deletion: bool;
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/plasma.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
deleted file mode 100644
index 23782ad..0000000
--- a/cpp/src/plasma/format/plasma.fbs
+++ /dev/null
@@ -1,291 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Plasma protocol specification
-
-enum MessageType:int {
- // Create a new object.
- PlasmaCreateRequest = 1,
- PlasmaCreateReply,
- // Seal an object.
- PlasmaSealRequest,
- PlasmaSealReply,
- // Get an object that is stored on the local Plasma store.
- PlasmaGetRequest,
- PlasmaGetReply,
- // Release an object.
- PlasmaReleaseRequest,
- PlasmaReleaseReply,
- // Delete an object.
- PlasmaDeleteRequest,
- PlasmaDeleteReply,
- // Get status of an object.
- PlasmaStatusRequest,
- PlasmaStatusReply,
- // See if the store contains an object (will be deprecated).
- PlasmaContainsRequest,
- PlasmaContainsReply,
- // Get information for a newly connecting client.
- PlasmaConnectRequest,
- PlasmaConnectReply,
- // Make room for new objects in the plasma store.
- PlasmaEvictRequest,
- PlasmaEvictReply,
- // Fetch objects from remote Plasma stores.
- PlasmaFetchRequest,
- // Wait for objects to be ready either from local or remote Plasma stores.
- PlasmaWaitRequest,
- PlasmaWaitReply,
- // Subscribe to a list of objects or to all objects.
- PlasmaSubscribeRequest,
- // Unsubscribe.
- PlasmaUnsubscribeRequest,
- // Sending and receiving data.
- // PlasmaDataRequest initiates sending the data, there will be one
- // such message per data transfer.
- PlasmaDataRequest,
- // PlasmaDataReply contains the actual data and is sent back to the
- // object store that requested the data. For each transfer, multiple
- // reply messages get sent. Each one contains a fixed number of bytes.
- PlasmaDataReply,
- // Object notifications.
- PlasmaNotification
-}
-
-enum PlasmaError:int {
- // Operation was successful.
- OK,
- // Trying to create an object that already exists.
- ObjectExists,
- // Trying to access an object that doesn't exist.
- ObjectNonexistent,
- // Trying to create an object but there isn't enough space in the store.
- OutOfMemory
-}
-
-// Plasma store messages
-
-struct PlasmaObjectSpec {
- // Index of the memory segment (= memory mapped file) that
- // this object is allocated in.
- segment_index: int;
- // Size in bytes of this segment (needed to call mmap).
- mmap_size: ulong;
- // The offset in bytes in the memory mapped file of the data.
- data_offset: ulong;
- // The size in bytes of the data.
- data_size: ulong;
- // The offset in bytes in the memory mapped file of the metadata.
- metadata_offset: ulong;
- // The size in bytes of the metadata.
- metadata_size: ulong;
-}
-
-table PlasmaCreateRequest {
- // ID of the object to be created.
- object_id: string;
- // The size of the object's data in bytes.
- data_size: ulong;
- // The size of the object's metadata in bytes.
- metadata_size: ulong;
-}
-
-table PlasmaCreateReply {
- // ID of the object that was created.
- object_id: string;
- // The object that is returned with this reply.
- plasma_object: PlasmaObjectSpec;
- // Error that occurred for this call.
- error: PlasmaError;
-}
-
-table PlasmaSealRequest {
- // ID of the object to be sealed.
- object_id: string;
- // Hash of the object data.
- digest: string;
-}
-
-table PlasmaSealReply {
- // ID of the object that was sealed.
- object_id: string;
- // Error code.
- error: PlasmaError;
-}
-
-table PlasmaGetRequest {
- // IDs of the objects stored at local Plasma store we are getting.
- object_ids: [string];
- // The number of milliseconds before the request should timeout.
- timeout_ms: long;
-}
-
-table PlasmaGetReply {
- // IDs of the objects being returned.
- // This number can be smaller than the number of requested
- // objects if not all requested objects are stored and sealed
- // in the local Plasma store.
- object_ids: [string];
- // Plasma object information, in the same order as their IDs.
- plasma_objects: [PlasmaObjectSpec];
- // The number of elements in both object_ids and plasma_objects arrays must agree.
-}
-
-table PlasmaReleaseRequest {
- // ID of the object to be released.
- object_id: string;
-}
-
-table PlasmaReleaseReply {
- // ID of the object that was released.
- object_id: string;
- // Error code.
- error: PlasmaError;
-}
-
-table PlasmaDeleteRequest {
- // ID of the object to be deleted.
- object_id: string;
-}
-
-table PlasmaDeleteReply {
- // ID of the object that was deleted.
- object_id: string;
- // Error code.
- error: PlasmaError;
-}
-
-table PlasmaStatusRequest {
- // IDs of the objects stored at local Plasma store we request the status of.
- object_ids: [string];
-}
-
-enum ObjectStatus:int {
- // Object is stored in the local Plasma Store.
- Local = 1,
- // Object is stored on a remote Plasma store, and it is not stored on the
- // local Plasma Store.
- Remote,
- // Object is not stored in the system.
- Nonexistent,
- // Object is currently transferred from a remote Plasma store the the local
- // Plasma Store.
- Transfer
-}
-
-table PlasmaStatusReply {
- // IDs of the objects being returned.
- object_ids: [string];
- // Status of the object.
- status: [ObjectStatus];
-}
-
-// PlasmaContains is a subset of PlasmaStatus which does not
-// involve the plasma manager, only the store. We should consider
-// unifying them in the future and deprecating PlasmaContains.
-
-table PlasmaContainsRequest {
- // ID of the object we are querying.
- object_id: string;
-}
-
-table PlasmaContainsReply {
- // ID of the object we are querying.
- object_id: string;
- // 1 if the object is in the store and 0 otherwise.
- has_object: int;
-}
-
-// PlasmaConnect is used by a plasma client the first time it connects with the
-// store. This is not really necessary, but is used to get some information
-// about the store such as its memory capacity.
-
-table PlasmaConnectRequest {
-}
-
-table PlasmaConnectReply {
- // The memory capacity of the store.
- memory_capacity: long;
-}
-
-table PlasmaEvictRequest {
- // Number of bytes that shall be freed.
- num_bytes: ulong;
-}
-
-table PlasmaEvictReply {
- // Number of bytes that have been freed.
- num_bytes: ulong;
-}
-
-table PlasmaFetchRequest {
- // IDs of objects to be gotten.
- object_ids: [string];
-}
-
-table ObjectRequestSpec {
- // ID of the object.
- object_id: string;
- // The type of the object. This specifies whether we
- // will be waiting for an object store in the local or
- // global Plasma store.
- type: int;
-}
-
-table PlasmaWaitRequest {
- // Array of object requests whose status we are asking for.
- object_requests: [ObjectRequestSpec];
- // Number of objects expected to be returned, if available.
- num_ready_objects: int;
- // timeout
- timeout: long;
-}
-
-table ObjectReply {
- // ID of the object.
- object_id: string;
- // The object status. This specifies where the object is stored.
- status: int;
-}
-
-table PlasmaWaitReply {
- // Array of object requests being returned.
- object_requests: [ObjectReply];
- // Number of objects expected to be returned, if available.
- num_ready_objects: int;
-}
-
-table PlasmaSubscribeRequest {
-}
-
-table PlasmaDataRequest {
- // ID of the object that is requested.
- object_id: string;
- // The host address where the data shall be sent to.
- address: string;
- // The port of the manager the data shall be sent to.
- port: int;
-}
-
-table PlasmaDataReply {
- // ID of the object that will be sent.
- object_id: string;
- // Size of the object data in bytes.
- object_size: ulong;
- // Size of the metadata in bytes.
- metadata_size: ulong;
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
deleted file mode 100644
index 5875ebb..0000000
--- a/cpp/src/plasma/io.cc
+++ /dev/null
@@ -1,212 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/io.h"
-
-#include "plasma/common.h"
-
-using arrow::Status;
-
-/* Number of times we try binding to a socket. */
-#define NUM_BIND_ATTEMPTS 5
-#define BIND_TIMEOUT_MS 100
-
-/* Number of times we try connecting to a socket. */
-#define NUM_CONNECT_ATTEMPTS 50
-#define CONNECT_TIMEOUT_MS 100
-
-Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
- ssize_t nbytes = 0;
- size_t bytesleft = length;
- size_t offset = 0;
- while (bytesleft > 0) {
- /* While we haven't written the whole message, write to the file descriptor,
- * advance the cursor, and decrease the amount left to write. */
- nbytes = write(fd, cursor + offset, bytesleft);
- if (nbytes < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
- return Status::IOError(std::string(strerror(errno)));
- } else if (nbytes == 0) {
- return Status::IOError("Encountered unexpected EOF");
- }
- ARROW_CHECK(nbytes > 0);
- bytesleft -= nbytes;
- offset += nbytes;
- }
-
- return Status::OK();
-}
-
-Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
- int64_t version = PLASMA_PROTOCOL_VERSION;
- RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
- RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
- RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
- return WriteBytes(fd, bytes, length * sizeof(char));
-}
-
-Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
- ssize_t nbytes = 0;
- /* Termination condition: EOF or read 'length' bytes total. */
- size_t bytesleft = length;
- size_t offset = 0;
- while (bytesleft > 0) {
- nbytes = read(fd, cursor + offset, bytesleft);
- if (nbytes < 0) {
- if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
- return Status::IOError(std::string(strerror(errno)));
- } else if (0 == nbytes) {
- return Status::IOError("Encountered unexpected EOF");
- }
- ARROW_CHECK(nbytes > 0);
- bytesleft -= nbytes;
- offset += nbytes;
- }
-
- return Status::OK();
-}
-
-Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
- int64_t version;
- RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
- *type = DISCONNECT_CLIENT);
- ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
- size_t length;
- RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
- *type = DISCONNECT_CLIENT);
- RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)),
- *type = DISCONNECT_CLIENT);
- if (length > buffer->size()) { buffer->resize(length); }
- RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
- return Status::OK();
-}
-
-int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
- struct sockaddr_un socket_address;
- int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (socket_fd < 0) {
- ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
- return -1;
- }
- /* Tell the system to allow the port to be reused. */
- int on = 1;
- if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
- sizeof(on)) < 0) {
- ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
- close(socket_fd);
- return -1;
- }
-
- unlink(pathname.c_str());
- memset(&socket_address, 0, sizeof(socket_address));
- socket_address.sun_family = AF_UNIX;
- if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
- ARROW_LOG(ERROR) << "Socket pathname is too long.";
- close(socket_fd);
- return -1;
- }
- strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
-
- if (bind(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) != 0) {
- ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname;
- close(socket_fd);
- return -1;
- }
- if (shall_listen && listen(socket_fd, 128) == -1) {
- ARROW_LOG(ERROR) << "Could not listen to socket " << pathname;
- close(socket_fd);
- return -1;
- }
- return socket_fd;
-}
-
-int connect_ipc_sock_retry(
- const std::string& pathname, int num_retries, int64_t timeout) {
- /* Pick the default values if the user did not specify. */
- if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; }
- if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; }
-
- int fd = -1;
- for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
- fd = connect_ipc_sock(pathname);
- if (fd >= 0) { break; }
- if (num_attempts == 0) {
- ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
- }
- /* Sleep for timeout milliseconds. */
- usleep(static_cast<int>(timeout * 1000));
- }
- /* If we could not connect to the socket, exit. */
- if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; }
- return fd;
-}
-
-int connect_ipc_sock(const std::string& pathname) {
- struct sockaddr_un socket_address;
- int socket_fd;
-
- socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
- if (socket_fd < 0) {
- ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
- return -1;
- }
-
- memset(&socket_address, 0, sizeof(socket_address));
- socket_address.sun_family = AF_UNIX;
- if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
- ARROW_LOG(ERROR) << "Socket pathname is too long.";
- return -1;
- }
- strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
-
- if (connect(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) !=
- 0) {
- close(socket_fd);
- return -1;
- }
-
- return socket_fd;
-}
-
-int AcceptClient(int socket_fd) {
- int client_fd = accept(socket_fd, NULL, NULL);
- if (client_fd < 0) {
- ARROW_LOG(ERROR) << "Error reading from socket.";
- return -1;
- }
- return client_fd;
-}
-
-uint8_t* read_message_async(int sock) {
- int64_t size;
- Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
- if (!s.ok()) {
- /* The other side has closed the socket. */
- ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
- close(sock);
- return NULL;
- }
- uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
- s = ReadBytes(sock, message, size);
- if (!s.ok()) {
- /* The other side has closed the socket. */
- ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
- close(sock);
- return NULL;
- }
- return message;
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
deleted file mode 100644
index 43c3fb5..0000000
--- a/cpp/src/plasma/io.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_IO_H
-#define PLASMA_IO_H
-
-#include <inttypes.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#include <string>
-#include <vector>
-
-#include "arrow/status.h"
-
-// TODO(pcm): Replace our own custom message header (message type,
-// message length, plasma protocol verion) with one that is serialized
-// using flatbuffers.
-#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
-#define DISCONNECT_CLIENT 0
-
-arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
-
-arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
-
-arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
-
-arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
-
-int bind_ipc_sock(const std::string& pathname, bool shall_listen);
-
-int connect_ipc_sock(const std::string& pathname);
-
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
-
-int AcceptClient(int socket_fd);
-
-uint8_t* read_message_async(int sock);
-
-#endif // PLASMA_IO_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/malloc.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
deleted file mode 100644
index 97c9a16..0000000
--- a/cpp/src/plasma/malloc.cc
+++ /dev/null
@@ -1,178 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/malloc.h"
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/mman.h>
-#include <unistd.h>
-
-#include <unordered_map>
-
-#include "plasma/common.h"
-
-extern "C" {
-void* fake_mmap(size_t);
-int fake_munmap(void*, int64_t);
-
-#define MMAP(s) fake_mmap(s)
-#define MUNMAP(a, s) fake_munmap(a, s)
-#define DIRECT_MMAP(s) fake_mmap(s)
-#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
-#define USE_DL_PREFIX
-#define HAVE_MORECORE 0
-#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T
-#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)
-
-#include "thirdparty/dlmalloc.c" // NOLINT
-
-#undef MMAP
-#undef MUNMAP
-#undef DIRECT_MMAP
-#undef DIRECT_MUNMAP
-#undef USE_DL_PREFIX
-#undef HAVE_MORECORE
-#undef DEFAULT_GRANULARITY
-}
-
-struct mmap_record {
- int fd;
- int64_t size;
-};
-
-namespace {
-
-/** Hashtable that contains one entry per segment that we got from the OS
- * via mmap. Associates the address of that segment with its file descriptor
- * and size. */
-std::unordered_map<void*, mmap_record> mmap_records;
-
-} /* namespace */
-
-constexpr int GRANULARITY_MULTIPLIER = 2;
-
-static void* pointer_advance(void* p, ptrdiff_t n) {
- return (unsigned char*)p + n;
-}
-
-static void* pointer_retreat(void* p, ptrdiff_t n) {
- return (unsigned char*)p - n;
-}
-
-static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) {
- return (unsigned char const*)pto - (unsigned char const*)pfrom;
-}
-
-/* Create a buffer. This is creating a temporary file and then
- * immediately unlinking it so we do not leave traces in the system. */
-int create_buffer(int64_t size) {
- int fd;
-#ifdef _WIN32
- if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
- (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size,
- NULL)) {
- fd = -1;
- }
-#else
-#ifdef __linux__
- constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
-#else
- constexpr char file_template[] = "/tmp/plasmaXXXXXX";
-#endif
- char file_name[32];
- strncpy(file_name, file_template, 32);
- fd = mkstemp(file_name);
- if (fd < 0) return -1;
- FILE* file = fdopen(fd, "a+");
- if (!file) {
- close(fd);
- return -1;
- }
- if (unlink(file_name) != 0) {
- ARROW_LOG(FATAL) << "unlink error";
- return -1;
- }
- if (ftruncate(fd, (off_t)size) != 0) {
- ARROW_LOG(FATAL) << "ftruncate error";
- return -1;
- }
-#endif
- return fd;
-}
-
-void* fake_mmap(size_t size) {
- /* Add sizeof(size_t) so that the returned pointer is deliberately not
- * page-aligned. This ensures that the segments of memory returned by
- * fake_mmap are never contiguous. */
- size += sizeof(size_t);
-
- int fd = create_buffer(size);
- ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
- void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
- if (pointer == MAP_FAILED) { return pointer; }
-
- /* Increase dlmalloc's allocation granularity directly. */
- mparams.granularity *= GRANULARITY_MULTIPLIER;
-
- mmap_record& record = mmap_records[pointer];
- record.fd = fd;
- record.size = size;
-
- /* We lie to dlmalloc about where mapped memory actually lives. */
- pointer = pointer_advance(pointer, sizeof(size_t));
- ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
- return pointer;
-}
-
-int fake_munmap(void* addr, int64_t size) {
- ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
- addr = pointer_retreat(addr, sizeof(size_t));
- size += sizeof(size_t);
-
- auto entry = mmap_records.find(addr);
-
- if (entry == mmap_records.end() || entry->second.size != size) {
- /* Reject requests to munmap that don't directly match previous
- * calls to mmap, to prevent dlmalloc from trimming. */
- return -1;
- }
-
- int r = munmap(addr, size);
- if (r == 0) { close(entry->second.fd); }
-
- mmap_records.erase(entry);
- return r;
-}
-
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
- /* TODO(rshin): Implement a more efficient search through mmap_records. */
- for (const auto& entry : mmap_records) {
- if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) {
- *fd = entry.second.fd;
- *map_size = entry.second.size;
- *offset = pointer_distance(entry.first, addr);
- return;
- }
- }
- *fd = -1;
- *map_size = 0;
- *offset = 0;
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/malloc.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
deleted file mode 100644
index b4af2c8..0000000
--- a/cpp/src/plasma/malloc.h
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_MALLOC_H
-#define PLASMA_MALLOC_H
-
-#include <inttypes.h>
-#include <stddef.h>
-
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
-
-#endif // MALLOC_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/plasma.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
deleted file mode 100644
index 559d8e7..0000000
--- a/cpp/src/plasma/plasma.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/plasma.h"
-
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include "plasma/common.h"
-#include "plasma/protocol.h"
-
-int warn_if_sigpipe(int status, int client_sock) {
- if (status >= 0) { return 0; }
- if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
- ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
- "sending a message to client on fd "
- << client_sock << ". The client on the other end may "
- "have hung up.";
- return errno;
- }
- ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
- return -1; // This is never reached.
-}
-
-/**
- * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
- * of this buffer are the length of the remaining message and the
- * remaining message is a serialized version of the object info.
- *
- * @param object_info The object info to be serialized
- * @return The object info buffer. It is the caller's responsibility to free
- * this buffer with "delete" after it has been used.
- */
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreateObjectInfo(fbb, object_info);
- fbb.Finish(message);
- uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
- *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
- memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
- return notification;
-}
-
-ObjectTableEntry* get_object_table_entry(
- PlasmaStoreInfo* store_info, const ObjectID& object_id) {
- auto it = store_info->objects.find(object_id);
- if (it == store_info->objects.end()) { return NULL; }
- return it->second.get();
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
deleted file mode 100644
index 275d0c7..0000000
--- a/cpp/src/plasma/plasma.h
+++ /dev/null
@@ -1,191 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_PLASMA_H
-#define PLASMA_PLASMA_H
-
-#include <errno.h>
-#include <inttypes.h>
-#include <stdbool.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h> // pid_t
-
-#include <unordered_map>
-#include <unordered_set>
-
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
-#include "format/common_generated.h"
-#include "plasma/common.h"
-
-#define HANDLE_SIGPIPE(s, fd_) \
- do { \
- Status _s = (s); \
- if (!_s.ok()) { \
- if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) { \
- ARROW_LOG(WARNING) \
- << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
- "sending a message to client on fd " \
- << fd_ << ". " \
- "The client on the other end may have hung up."; \
- } else { \
- return _s; \
- } \
- } \
- } while (0);
-
-/// Allocation granularity used in plasma for object allocation.
-#define BLOCK_SIZE 64
-
-/// Size of object hash digests.
-constexpr int64_t kDigestSize = sizeof(uint64_t);
-
-struct Client;
-
-/// Object request data structure. Used in the plasma_wait_for_objects()
-/// argument.
-typedef struct {
- /// The ID of the requested object. If ID_NIL request any object.
- ObjectID object_id;
- /// Request associated to the object. It can take one of the following values:
- /// - PLASMA_QUERY_LOCAL: return if or when the object is available in the
- /// local Plasma Store.
- /// - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
- /// the system (i.e., either in the local or a remote Plasma Store).
- int type;
- /// Object status. Same as the status returned by plasma_status() function
- /// call. This is filled in by plasma_wait_for_objects1():
- /// - ObjectStatus_Local: object is ready at the local Plasma Store.
- /// - ObjectStatus_Remote: object is ready at a remote Plasma Store.
- /// - ObjectStatus_Nonexistent: object does not exist in the system.
- /// - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
- /// for being transferred or it is transferring.
- int status;
-} ObjectRequest;
-
-/// Mapping from object IDs to type and status of the request.
-typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
-
-/// Handle to access memory mapped file and map it into client address space.
-typedef struct {
- /// The file descriptor of the memory mapped file in the store. It is used as
- /// a unique identifier of the file in the client to look up the corresponding
- /// file descriptor on the client's side.
- int store_fd;
- /// The size in bytes of the memory mapped file.
- int64_t mmap_size;
-} object_handle;
-
-// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-typedef struct {
- /// Handle for memory mapped file the object is stored in.
- object_handle handle;
- /// The offset in bytes in the memory mapped file of the data.
- ptrdiff_t data_offset;
- /// The offset in bytes in the memory mapped file of the metadata.
- ptrdiff_t metadata_offset;
- /// The size in bytes of the data.
- int64_t data_size;
- /// The size in bytes of the metadata.
- int64_t metadata_size;
-} PlasmaObject;
-
-typedef enum {
- /// Object was created but not sealed in the local Plasma Store.
- PLASMA_CREATED = 1,
- /// Object is sealed and stored in the local Plasma Store.
- PLASMA_SEALED
-} object_state;
-
-typedef enum {
- /// The object was not found.
- OBJECT_NOT_FOUND = 0,
- /// The object was found.
- OBJECT_FOUND = 1
-} object_status;
-
-typedef enum {
- /// Query for object in the local plasma store.
- PLASMA_QUERY_LOCAL = 1,
- /// Query for object in the local plasma store or in a remote plasma store.
- PLASMA_QUERY_ANYWHERE
-} object_request_type;
-
-/// This type is used by the Plasma store. It is here because it is exposed to
-/// the eviction policy.
-struct ObjectTableEntry {
- /// Object id of this object.
- ObjectID object_id;
- /// Object info like size, creation time and owner.
- ObjectInfoT info;
- /// Memory mapped file containing the object.
- int fd;
- /// Size of the underlying map.
- int64_t map_size;
- /// Offset from the base of the mmap.
- ptrdiff_t offset;
- /// Pointer to the object data. Needed to free the object.
- uint8_t* pointer;
- /// Set of clients currently using this object.
- std::unordered_set<Client*> clients;
- /// The state of the object, e.g., whether it is open or sealed.
- object_state state;
- /// The digest of the object. Used to see if two objects are the same.
- unsigned char digest[kDigestSize];
-};
-
-/// The plasma store information that is exposed to the eviction policy.
-struct PlasmaStoreInfo {
- /// Objects that are in the Plasma store.
- std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>, UniqueIDHasher> objects;
- /// The amount of memory (in bytes) that we allow to be allocated in the
- /// store.
- int64_t memory_capacity;
-};
-
-/// Get an entry from the object table and return NULL if the object_id
-/// is not present.
-///
-/// @param store_info The PlasmaStoreInfo that contains the object table.
-/// @param object_id The object_id of the entry we are looking for.
-/// @return The entry associated with the object_id or NULL if the object_id
-/// is not present.
-ObjectTableEntry* get_object_table_entry(
- PlasmaStoreInfo* store_info, const ObjectID& object_id);
-
-/// Print a warning if the status is less than zero. This should be used to check
-/// the success of messages sent to plasma clients. We print a warning instead of
-/// failing because the plasma clients are allowed to die. This is used to handle
-/// situations where the store writes to a client file descriptor, and the client
-/// may already have disconnected. If we have processed the disconnection and
-/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
-/// have not, then we should get a SIGPIPE. If we write to a TCP socket that
-/// isn't connected yet, then we should get an ECONNRESET.
-///
-/// @param status The status to check. If it is less less than zero, we will
-/// print a warning.
-/// @param client_sock The client socket. This is just used to print some extra
-/// information.
-/// @return The errno set.
-int warn_if_sigpipe(int status, int client_sock);
-
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
-
-#endif // PLASMA_PLASMA_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
deleted file mode 100644
index 246aa29..0000000
--- a/cpp/src/plasma/protocol.cc
+++ /dev/null
@@ -1,502 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/protocol.h"
-
-#include "flatbuffers/flatbuffers.h"
-#include "format/plasma_generated.h"
-
-#include "plasma/common.h"
-#include "plasma/io.h"
-
-using flatbuffers::uoffset_t;
-
-flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
-to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
- int64_t num_objects) {
- std::vector<flatbuffers::Offset<flatbuffers::String>> results;
- for (int64_t i = 0; i < num_objects; i++) {
- results.push_back(fbb->CreateString(object_ids[i].binary()));
- }
- return fbb->CreateVector(results);
-}
-
-Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer) {
- int64_t type;
- RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
- ARROW_CHECK(type == message_type) << "type = " << type
- << ", message_type = " << message_type;
- return Status::OK();
-}
-
-template <typename Message>
-Status PlasmaSend(int sock, int64_t message_type, flatbuffers::FlatBufferBuilder* fbb,
- const Message& message) {
- fbb->Finish(message);
- return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
-}
-
-// Create messages.
-
-Status SendCreateRequest(
- int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaCreateRequest(
- fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size);
- return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
-}
-
-Status ReadCreateRequest(
- uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
- *data_size = message->data_size();
- *metadata_size = message->metadata_size();
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return Status::OK();
-}
-
-Status SendCreateReply(
- int sock, ObjectID object_id, PlasmaObject* object, int error_code) {
- flatbuffers::FlatBufferBuilder fbb;
- PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size,
- object->data_offset, object->data_size, object->metadata_offset,
- object->metadata_size);
- auto message = CreatePlasmaCreateReply(
- fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError)error_code);
- return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
-}
-
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- object->handle.store_fd = message->plasma_object()->segment_index();
- object->handle.mmap_size = message->plasma_object()->mmap_size();
- object->data_offset = message->plasma_object()->data_offset();
- object->data_size = message->plasma_object()->data_size();
- object->metadata_offset = message->plasma_object()->metadata_offset();
- object->metadata_size = message->plasma_object()->metadata_size();
- return plasma_error_status(message->error());
-}
-
-// Seal messages.
-
-Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
- flatbuffers::FlatBufferBuilder fbb;
- auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
- auto message =
- CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), digest_string);
- return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
-}
-
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- ARROW_CHECK(message->digest()->size() == kDigestSize);
- memcpy(digest, message->digest()->data(), kDigestSize);
- return Status::OK();
-}
-
-Status SendSealReply(int sock, ObjectID object_id, int error) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaSealReply(
- fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
- return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
-}
-
-Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
-}
-
-// Release messages.
-
-Status SendReleaseRequest(int sock, ObjectID object_id) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
- return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
-}
-
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return Status::OK();
-}
-
-Status SendReleaseReply(int sock, ObjectID object_id, int error) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaReleaseReply(
- fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
- return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
-}
-
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
-}
-
-// Delete messages.
-
-Status SendDeleteRequest(int sock, ObjectID object_id) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
- return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
-}
-
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return Status::OK();
-}
-
-Status SendDeleteReply(int sock, ObjectID object_id, int error) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDeleteReply(
- fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
- return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
-}
-
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return plasma_error_status(message->error());
-}
-
-// Satus messages.
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
- return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
-}
-
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
- }
- return Status::OK();
-}
-
-Status SendStatusReply(
- int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVector(object_status, num_objects));
- return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
-}
-
-int64_t ReadStatusReply_num_objects(uint8_t* data) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
- return message->object_ids()->size();
-}
-
-Status ReadStatusReply(
- uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
- }
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_status[i] = message->status()->data()[i];
- }
- return Status::OK();
-}
-
-// Contains messages.
-
-Status SendContainsRequest(int sock, ObjectID object_id) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
- return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
-}
-
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- return Status::OK();
-}
-
-Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), has_object);
- return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
-}
-
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- *has_object = message->has_object();
- return Status::OK();
-}
-
-// Connect messages.
-
-Status SendConnectRequest(int sock) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaConnectRequest(fbb);
- return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message);
-}
-
-Status ReadConnectRequest(uint8_t* data) {
- return Status::OK();
-}
-
-Status SendConnectReply(int sock, int64_t memory_capacity) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
- return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
-}
-
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
- *memory_capacity = message->memory_capacity();
- return Status::OK();
-}
-
-// Evict messages.
-
-Status SendEvictRequest(int sock, int64_t num_bytes) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
- return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
-}
-
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
- *num_bytes = message->num_bytes();
- return Status::OK();
-}
-
-Status SendEvictReply(int sock, int64_t num_bytes) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaEvictReply(fbb, num_bytes);
- return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
-}
-
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
- num_bytes = message->num_bytes();
- return Status::OK();
-}
-
-// Get messages.
-
-Status SendGetRequest(
- int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaGetRequest(
- fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms);
- return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
-}
-
-Status ReadGetRequest(
- uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
- for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
- auto object_id = message->object_ids()->Get(i)->str();
- object_ids.push_back(ObjectID::from_binary(object_id));
- }
- *timeout_ms = message->timeout_ms();
- return Status::OK();
-}
-
-Status SendGetReply(int sock, ObjectID object_ids[],
- std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
- int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- std::vector<PlasmaObjectSpec> objects;
-
- for (int i = 0; i < num_objects; ++i) {
- const PlasmaObject& object = plasma_objects[object_ids[i]];
- objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size,
- object.data_offset, object.data_size, object.metadata_offset,
- object.metadata_size));
- }
- auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
- fbb.CreateVectorOfStructs(objects.data(), num_objects));
- return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
-}
-
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
- int64_t num_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
- for (uoffset_t i = 0; i < num_objects; ++i) {
- object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
- }
- for (uoffset_t i = 0; i < num_objects; ++i) {
- const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
- plasma_objects[i].handle.store_fd = object->segment_index();
- plasma_objects[i].handle.mmap_size = object->mmap_size();
- plasma_objects[i].data_offset = object->data_offset();
- plasma_objects[i].data_size = object->data_size();
- plasma_objects[i].metadata_offset = object->metadata_offset();
- plasma_objects[i].metadata_size = object->metadata_size();
- }
- return Status::OK();
-}
-
-// Fetch messages.
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message =
- CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
- return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
-}
-
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
- for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
- object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
- }
- return Status::OK();
-}
-
-// Wait messages.
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
- int num_ready_objects, int64_t timeout_ms) {
- flatbuffers::FlatBufferBuilder fbb;
-
- std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs;
- for (int i = 0; i < num_requests; i++) {
- object_request_specs.push_back(CreateObjectRequestSpec(fbb,
- fbb.CreateString(object_requests[i].object_id.binary()),
- object_requests[i].type));
- }
-
- auto message = CreatePlasmaWaitRequest(
- fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms);
- return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
-}
-
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
- int64_t* timeout_ms, int* num_ready_objects) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
- *num_ready_objects = message->num_ready_objects();
- *timeout_ms = message->timeout();
-
- for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
- ObjectID object_id =
- ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
- ObjectRequest object_request({object_id, message->object_requests()->Get(i)->type(),
- ObjectStatus_Nonexistent});
- object_requests[object_id] = object_request;
- }
- return Status::OK();
-}
-
-Status SendWaitReply(
- int sock, const ObjectRequestMap& object_requests, int num_ready_objects) {
- flatbuffers::FlatBufferBuilder fbb;
-
- std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
- for (const auto& entry : object_requests) {
- const auto& object_request = entry.second;
- object_replies.push_back(CreateObjectReply(
- fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
- }
-
- auto message = CreatePlasmaWaitReply(
- fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
- return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
-}
-
-Status ReadWaitReply(
- uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
- DCHECK(data);
-
- auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
- *num_ready_objects = message->num_ready_objects();
- for (int i = 0; i < *num_ready_objects; i++) {
- object_requests[i].object_id =
- ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
- object_requests[i].status = message->object_requests()->Get(i)->status();
- }
- return Status::OK();
-}
-
-// Subscribe messages.
-
-Status SendSubscribeRequest(int sock) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaSubscribeRequest(fbb);
- return PlasmaSend(sock, MessageType_PlasmaSubscribeRequest, &fbb, message);
-}
-
-// Data messages.
-
-Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) {
- flatbuffers::FlatBufferBuilder fbb;
- auto addr = fbb.CreateString(address, strlen(address));
- auto message =
- CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
- return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
-}
-
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
- DCHECK(message->object_id()->size() == sizeof(ObjectID));
- *object_id = ObjectID::from_binary(message->object_id()->str());
- *address = strdup(message->address()->c_str());
- *port = message->port();
- return Status::OK();
-}
-
-Status SendDataReply(
- int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) {
- flatbuffers::FlatBufferBuilder fbb;
- auto message = CreatePlasmaDataReply(
- fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size);
- return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
-}
-
-Status ReadDataReply(
- uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
- DCHECK(data);
- auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
- *object_id = ObjectID::from_binary(message->object_id()->str());
- *object_size = (int64_t)message->object_size();
- *metadata_size = (int64_t)message->metadata_size();
- return Status::OK();
-}
http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
deleted file mode 100644
index 5d9d136..0000000
--- a/cpp/src/plasma/protocol.h
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_PROTOCOL_H
-#define PLASMA_PROTOCOL_H
-
-#include <vector>
-
-#include "arrow/status.h"
-#include "format/plasma_generated.h"
-#include "plasma/plasma.h"
-
-using arrow::Status;
-
-/* Plasma receive message. */
-
-Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer);
-
-/* Plasma Create message functions. */
-
-Status SendCreateRequest(
- int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size);
-
-Status ReadCreateRequest(
- uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
-
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
-
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object);
-
-/* Plasma Seal message functions. */
-
-Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
-
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest);
-
-Status SendSealReply(int sock, ObjectID object_id, int error);
-
-Status ReadSealReply(uint8_t* data, ObjectID* object_id);
-
-/* Plasma Get message functions. */
-
-Status SendGetRequest(
- int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms);
-
-Status ReadGetRequest(
- uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms);
-
-Status SendGetReply(int sock, ObjectID object_ids[],
- std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
- int64_t num_objects);
-
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
- int64_t num_objects);
-
-/* Plasma Release message functions. */
-
-Status SendReleaseRequest(int sock, ObjectID object_id);
-
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendReleaseReply(int sock, ObjectID object_id, int error);
-
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id);
-
-/* Plasma Delete message functions. */
-
-Status SendDeleteRequest(int sock, ObjectID object_id);
-
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendDeleteReply(int sock, ObjectID object_id, int error);
-
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id);
-
-/* Satus messages. */
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects);
-
-Status SendStatusReply(
- int sock, ObjectID object_ids[], int object_status[], int64_t num_objects);
-
-int64_t ReadStatusReply_num_objects(uint8_t* data);
-
-Status ReadStatusReply(
- uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects);
-
-/* Plasma Constains message functions. */
-
-Status SendContainsRequest(int sock, ObjectID object_id);
-
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
-
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object);
-
-/* Plasma Connect message functions. */
-
-Status SendConnectRequest(int sock);
-
-Status ReadConnectRequest(uint8_t* data);
-
-Status SendConnectReply(int sock, int64_t memory_capacity);
-
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity);
-
-/* Plasma Evict message functions (no reply so far). */
-
-Status SendEvictRequest(int sock, int64_t num_bytes);
-
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes);
-
-Status SendEvictReply(int sock, int64_t num_bytes);
-
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes);
-
-/* Plasma Fetch Remote message functions. */
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids);
-
-/* Plasma Wait message functions. */
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
- int num_ready_objects, int64_t timeout_ms);
-
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
- int64_t* timeout_ms, int* num_ready_objects);
-
-Status SendWaitReply(
- int sock, const ObjectRequestMap& object_requests, int num_ready_objects);
-
-Status ReadWaitReply(
- uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects);
-
-/* Plasma Subscribe message functions. */
-
-Status SendSubscribeRequest(int sock);
-
-/* Data messages. */
-
-Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
-
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port);
-
-Status SendDataReply(
- int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size);
-
-Status ReadDataReply(
- uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size);
-
-#endif /* PLASMA_PROTOCOL */