You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/23 18:39:12 UTC

[05/14] arrow git commit: [C++] Remove Plasma source tree for 0.5.0 release pending IP Clearance

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/fling.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.cc b/cpp/src/plasma/fling.cc
deleted file mode 100644
index 79da4f4..0000000
--- a/cpp/src/plasma/fling.cc
+++ /dev/null
@@ -1,90 +0,0 @@
-// Copyright 2013 Sharvil Nanavati
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "plasma/fling.h"
-
-#include <string.h>
-
-void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len) {
-  iov->iov_base = buf;
-  iov->iov_len = 1;
-
-  msg->msg_iov = iov;
-  msg->msg_iovlen = 1;
-  msg->msg_control = buf;
-  msg->msg_controllen = buf_len;
-  msg->msg_name = NULL;
-  msg->msg_namelen = 0;
-}
-
-int send_fd(int conn, int fd) {
-  struct msghdr msg;
-  struct iovec iov;
-  char buf[CMSG_SPACE(sizeof(int))];
-  memset(&buf, 0, CMSG_SPACE(sizeof(int)));
-
-  init_msg(&msg, &iov, buf, sizeof(buf));
-
-  struct cmsghdr* header = CMSG_FIRSTHDR(&msg);
-  header->cmsg_level = SOL_SOCKET;
-  header->cmsg_type = SCM_RIGHTS;
-  header->cmsg_len = CMSG_LEN(sizeof(int));
-  *reinterpret_cast<int*>(CMSG_DATA(header)) = fd;
-
-  // Send file descriptor.
-  ssize_t r = sendmsg(conn, &msg, 0);
-  if (r >= 0) {
-    return 0;
-  } else {
-    return static_cast<int>(r);
-  }
-}
-
-int recv_fd(int conn) {
-  struct msghdr msg;
-  struct iovec iov;
-  char buf[CMSG_SPACE(sizeof(int))];
-  init_msg(&msg, &iov, buf, sizeof(buf));
-
-  if (recvmsg(conn, &msg, 0) == -1) return -1;
-
-  int found_fd = -1;
-  int oh_noes = 0;
-  for (struct cmsghdr* header = CMSG_FIRSTHDR(&msg); header != NULL;
-       header = CMSG_NXTHDR(&msg, header))
-    if (header->cmsg_level == SOL_SOCKET && header->cmsg_type == SCM_RIGHTS) {
-      ssize_t count =
-          (header->cmsg_len - (CMSG_DATA(header) - (unsigned char*)header)) / sizeof(int);
-      for (int i = 0; i < count; ++i) {
-        int fd = (reinterpret_cast<int*>(CMSG_DATA(header)))[i];
-        if (found_fd == -1) {
-          found_fd = fd;
-        } else {
-          close(fd);
-          oh_noes = 1;
-        }
-      }
-    }
-
-  // The sender sent us more than one file descriptor. We've closed
-  // them all to prevent fd leaks but notify the caller that we got
-  // a bad message.
-  if (oh_noes) {
-    close(found_fd);
-    errno = EBADMSG;
-    return -1;
-  }
-
-  return found_fd;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/fling.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/fling.h b/cpp/src/plasma/fling.h
deleted file mode 100644
index 78ac9d1..0000000
--- a/cpp/src/plasma/fling.h
+++ /dev/null
@@ -1,52 +0,0 @@
-// Copyright 2013 Sharvil Nanavati
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//    http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// FLING: Exchanging file descriptors over sockets
-//
-// This is a little library for sending file descriptors over a socket
-// between processes. The reason for doing that (as opposed to using
-// filenames to share the files) is so (a) no files remain in the
-// filesystem after all the processes terminate, (b) to make sure that
-// there are no name collisions and (c) to be able to control who has
-// access to the data.
-//
-// Most of the code is from https://github.com/sharvil/flingfd
-
-#include <errno.h>
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-// This is neccessary for Mac OS X, see http://www.apuebook.com/faqs2e.html
-// (10).
-#if !defined(CMSG_SPACE) && !defined(CMSG_LEN)
-#define CMSG_SPACE(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + __DARWIN_ALIGN32(len))
-#define CMSG_LEN(len) (__DARWIN_ALIGN32(sizeof(struct cmsghdr)) + (len))
-#endif
-
-void init_msg(struct msghdr* msg, struct iovec* iov, char* buf, size_t buf_len);
-
-// Send a file descriptor over a unix domain socket.
-//
-// @param conn Unix domain socket to send the file descriptor over.
-// @param fd File descriptor to send over.
-// @return Status code which is < 0 on failure.
-int send_fd(int conn, int fd);
-
-// Receive a file descriptor over a unix domain socket.
-//
-// @param conn Unix domain socket to receive the file descriptor from.
-// @return File descriptor or a value < 0 on failure.
-int recv_fd(int conn);

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/.gitignore
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/.gitignore b/cpp/src/plasma/format/.gitignore
deleted file mode 100644
index b2ddb05..0000000
--- a/cpp/src/plasma/format/.gitignore
+++ /dev/null
@@ -1 +0,0 @@
-*_generated.h

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/common.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/common.fbs b/cpp/src/plasma/format/common.fbs
deleted file mode 100644
index 4d7d285..0000000
--- a/cpp/src/plasma/format/common.fbs
+++ /dev/null
@@ -1,34 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Object information data structure.
-table ObjectInfo {
-  // Object ID of this object.
-  object_id: string;
-  // Number of bytes the content of this object occupies in memory.
-  data_size: long;
-  // Number of bytes the metadata of this object occupies in memory.
-  metadata_size: long;
-  // Unix epoch of when this object was created.
-  create_time: long;
-  // How long creation of this object took.
-  construct_duration: long;
-  // Hash of the object content.
-  digest: string;
-  // Specifies if this object was deleted or added.
-  is_deletion: bool;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/format/plasma.fbs
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
deleted file mode 100644
index 23782ad..0000000
--- a/cpp/src/plasma/format/plasma.fbs
+++ /dev/null
@@ -1,291 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Plasma protocol specification
-
-enum MessageType:int {
-  // Create a new object.
-  PlasmaCreateRequest = 1,
-  PlasmaCreateReply,
-  // Seal an object.
-  PlasmaSealRequest,
-  PlasmaSealReply,
-  // Get an object that is stored on the local Plasma store.
-  PlasmaGetRequest,
-  PlasmaGetReply,
-  // Release an object.
-  PlasmaReleaseRequest,
-  PlasmaReleaseReply,
-  // Delete an object.
-  PlasmaDeleteRequest,
-  PlasmaDeleteReply,
-  // Get status of an object.
-  PlasmaStatusRequest,
-  PlasmaStatusReply,
-  // See if the store contains an object (will be deprecated).
-  PlasmaContainsRequest,
-  PlasmaContainsReply,
-  // Get information for a newly connecting client.
-  PlasmaConnectRequest,
-  PlasmaConnectReply,
-  // Make room for new objects in the plasma store.
-  PlasmaEvictRequest,
-  PlasmaEvictReply,
-  // Fetch objects from remote Plasma stores.
-  PlasmaFetchRequest,
-  // Wait for objects to be ready either from local or remote Plasma stores.
-  PlasmaWaitRequest,
-  PlasmaWaitReply,
-  // Subscribe to a list of objects or to all objects.
-  PlasmaSubscribeRequest,
-  // Unsubscribe.
-  PlasmaUnsubscribeRequest,
-  // Sending and receiving data.
-  // PlasmaDataRequest initiates sending the data, there will be one
-  // such message per data transfer.
-  PlasmaDataRequest,
-  // PlasmaDataReply contains the actual data and is sent back to the
-  // object store that requested the data. For each transfer, multiple
-  // reply messages get sent. Each one contains a fixed number of bytes.
-  PlasmaDataReply,
-  // Object notifications.
-  PlasmaNotification
-}
-
-enum PlasmaError:int {
-  // Operation was successful.
-  OK,
-  // Trying to create an object that already exists.
-  ObjectExists,
-  // Trying to access an object that doesn't exist.
-  ObjectNonexistent,
-  // Trying to create an object but there isn't enough space in the store.
-  OutOfMemory
-}
-
-// Plasma store messages
-
-struct PlasmaObjectSpec {
-  // Index of the memory segment (= memory mapped file) that
-  // this object is allocated in.
-  segment_index: int;
-  // Size in bytes of this segment (needed to call mmap).
-  mmap_size: ulong;
-  // The offset in bytes in the memory mapped file of the data.
-  data_offset: ulong;
-  // The size in bytes of the data.
-  data_size: ulong;
-  // The offset in bytes in the memory mapped file of the metadata.
-  metadata_offset: ulong;
-  // The size in bytes of the metadata.
-  metadata_size: ulong;
-}
-
-table PlasmaCreateRequest {
-  // ID of the object to be created.
-  object_id: string;
-  // The size of the object's data in bytes.
-  data_size: ulong;
-  // The size of the object's metadata in bytes.
-  metadata_size: ulong;
-}
-
-table PlasmaCreateReply {
-  // ID of the object that was created.
-  object_id: string;
-  // The object that is returned with this reply.
-  plasma_object: PlasmaObjectSpec;
-  // Error that occurred for this call.
-  error: PlasmaError;
-}
-
-table PlasmaSealRequest {
-  // ID of the object to be sealed.
-  object_id: string;
-  // Hash of the object data.
-  digest: string;
-}
-
-table PlasmaSealReply {
-  // ID of the object that was sealed.
-  object_id: string;
-  // Error code.
-  error: PlasmaError;
-}
-
-table PlasmaGetRequest {
-  // IDs of the objects stored at local Plasma store we are getting.
-  object_ids: [string];
-  // The number of milliseconds before the request should timeout.
-  timeout_ms: long;
-}
-
-table PlasmaGetReply {
-  // IDs of the objects being returned.
-  // This number can be smaller than the number of requested
-  // objects if not all requested objects are stored and sealed
-  // in the local Plasma store.
-  object_ids: [string];
-  // Plasma object information, in the same order as their IDs.
-  plasma_objects: [PlasmaObjectSpec];
-  // The number of elements in both object_ids and plasma_objects arrays must agree.
-}
-
-table PlasmaReleaseRequest {
-  // ID of the object to be released.
-  object_id: string;
-}
-
-table PlasmaReleaseReply {
-  // ID of the object that was released.
-  object_id: string;
-  // Error code.
-  error: PlasmaError;
-}
-
-table PlasmaDeleteRequest {
-  // ID of the object to be deleted.
-  object_id: string;
-}
-
-table PlasmaDeleteReply {
-  // ID of the object that was deleted.
-  object_id: string;
-  // Error code.
-  error: PlasmaError;
-}
-
-table PlasmaStatusRequest {
-  // IDs of the objects stored at local Plasma store we request the status of.
-  object_ids: [string];
-}
-
-enum ObjectStatus:int {
-  // Object is stored in the local Plasma Store.
-  Local = 1,
-  // Object is stored on a remote Plasma store, and it is not stored on the
-  // local Plasma Store.
-  Remote,
-  // Object is not stored in the system.
-  Nonexistent,
-  // Object is currently transferred from a remote Plasma store the the local
-  // Plasma Store.
-  Transfer
-}
-
-table PlasmaStatusReply {
-  // IDs of the objects being returned.
-  object_ids: [string];
-  // Status of the object.
-  status: [ObjectStatus];
-}
-
-// PlasmaContains is a subset of PlasmaStatus which does not
-// involve the plasma manager, only the store. We should consider
-// unifying them in the future and deprecating PlasmaContains.
-
-table PlasmaContainsRequest {
-  // ID of the object we are querying.
-  object_id: string;
-}
-
-table PlasmaContainsReply {
-  // ID of the object we are querying.
-  object_id: string;
-  // 1 if the object is in the store and 0 otherwise.
-  has_object: int;
-}
-
-// PlasmaConnect is used by a plasma client the first time it connects with the
-// store. This is not really necessary, but is used to get some information
-// about the store such as its memory capacity.
-
-table PlasmaConnectRequest {
-}
-
-table PlasmaConnectReply {
-  // The memory capacity of the store.
-  memory_capacity: long;
-}
-
-table PlasmaEvictRequest {
-  // Number of bytes that shall be freed.
-  num_bytes: ulong;
-}
-
-table PlasmaEvictReply {
-  // Number of bytes that have been freed.
-  num_bytes: ulong;
-}
-
-table PlasmaFetchRequest {
-  // IDs of objects to be gotten.
-  object_ids: [string];
-}
-
-table ObjectRequestSpec {
-  // ID of the object.
-  object_id: string;
-  // The type of the object. This specifies whether we
-  // will be waiting for an object store in the local or
-  // global Plasma store.
-  type: int;
-}
-
-table PlasmaWaitRequest {
-  // Array of object requests whose status we are asking for.
-  object_requests: [ObjectRequestSpec];
-  // Number of objects expected to be returned, if available.
-  num_ready_objects: int;
-  // timeout
-  timeout: long;
-}
-
-table ObjectReply {
-  // ID of the object.
-  object_id: string;
-  // The object status. This specifies where the object is stored.
-  status: int;
-}
-
-table PlasmaWaitReply {
-  // Array of object requests being returned.
-  object_requests: [ObjectReply];
-  // Number of objects expected to be returned, if available.
-  num_ready_objects: int;
-}
-
-table PlasmaSubscribeRequest {
-}
-
-table PlasmaDataRequest {
-  // ID of the object that is requested.
-  object_id: string;
-  // The host address where the data shall be sent to.
-  address: string;
-  // The port of the manager the data shall be sent to.
-  port: int;
-}
-
-table PlasmaDataReply {
-  // ID of the object that will be sent.
-  object_id: string;
-  // Size of the object data in bytes.
-  object_size: ulong;
-  // Size of the metadata in bytes.
-  metadata_size: ulong;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
deleted file mode 100644
index 5875ebb..0000000
--- a/cpp/src/plasma/io.cc
+++ /dev/null
@@ -1,212 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/io.h"
-
-#include "plasma/common.h"
-
-using arrow::Status;
-
-/* Number of times we try binding to a socket. */
-#define NUM_BIND_ATTEMPTS 5
-#define BIND_TIMEOUT_MS 100
-
-/* Number of times we try connecting to a socket. */
-#define NUM_CONNECT_ATTEMPTS 50
-#define CONNECT_TIMEOUT_MS 100
-
-Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
-  ssize_t nbytes = 0;
-  size_t bytesleft = length;
-  size_t offset = 0;
-  while (bytesleft > 0) {
-    /* While we haven't written the whole message, write to the file descriptor,
-     * advance the cursor, and decrease the amount left to write. */
-    nbytes = write(fd, cursor + offset, bytesleft);
-    if (nbytes < 0) {
-      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
-      return Status::IOError(std::string(strerror(errno)));
-    } else if (nbytes == 0) {
-      return Status::IOError("Encountered unexpected EOF");
-    }
-    ARROW_CHECK(nbytes > 0);
-    bytesleft -= nbytes;
-    offset += nbytes;
-  }
-
-  return Status::OK();
-}
-
-Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
-  int64_t version = PLASMA_PROTOCOL_VERSION;
-  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
-  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
-  RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
-  return WriteBytes(fd, bytes, length * sizeof(char));
-}
-
-Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
-  ssize_t nbytes = 0;
-  /* Termination condition: EOF or read 'length' bytes total. */
-  size_t bytesleft = length;
-  size_t offset = 0;
-  while (bytesleft > 0) {
-    nbytes = read(fd, cursor + offset, bytesleft);
-    if (nbytes < 0) {
-      if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { continue; }
-      return Status::IOError(std::string(strerror(errno)));
-    } else if (0 == nbytes) {
-      return Status::IOError("Encountered unexpected EOF");
-    }
-    ARROW_CHECK(nbytes > 0);
-    bytesleft -= nbytes;
-    offset += nbytes;
-  }
-
-  return Status::OK();
-}
-
-Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
-  int64_t version;
-  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
-      *type = DISCONNECT_CLIENT);
-  ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
-  size_t length;
-  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
-      *type = DISCONNECT_CLIENT);
-  RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)),
-      *type = DISCONNECT_CLIENT);
-  if (length > buffer->size()) { buffer->resize(length); }
-  RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
-  return Status::OK();
-}
-
-int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
-  struct sockaddr_un socket_address;
-  int socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (socket_fd < 0) {
-    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
-    return -1;
-  }
-  /* Tell the system to allow the port to be reused. */
-  int on = 1;
-  if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
-          sizeof(on)) < 0) {
-    ARROW_LOG(ERROR) << "setsockopt failed for pathname " << pathname;
-    close(socket_fd);
-    return -1;
-  }
-
-  unlink(pathname.c_str());
-  memset(&socket_address, 0, sizeof(socket_address));
-  socket_address.sun_family = AF_UNIX;
-  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
-    ARROW_LOG(ERROR) << "Socket pathname is too long.";
-    close(socket_fd);
-    return -1;
-  }
-  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
-
-  if (bind(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) != 0) {
-    ARROW_LOG(ERROR) << "Bind failed for pathname " << pathname;
-    close(socket_fd);
-    return -1;
-  }
-  if (shall_listen && listen(socket_fd, 128) == -1) {
-    ARROW_LOG(ERROR) << "Could not listen to socket " << pathname;
-    close(socket_fd);
-    return -1;
-  }
-  return socket_fd;
-}
-
-int connect_ipc_sock_retry(
-    const std::string& pathname, int num_retries, int64_t timeout) {
-  /* Pick the default values if the user did not specify. */
-  if (num_retries < 0) { num_retries = NUM_CONNECT_ATTEMPTS; }
-  if (timeout < 0) { timeout = CONNECT_TIMEOUT_MS; }
-
-  int fd = -1;
-  for (int num_attempts = 0; num_attempts < num_retries; ++num_attempts) {
-    fd = connect_ipc_sock(pathname);
-    if (fd >= 0) { break; }
-    if (num_attempts == 0) {
-      ARROW_LOG(ERROR) << "Connection to socket failed for pathname " << pathname;
-    }
-    /* Sleep for timeout milliseconds. */
-    usleep(static_cast<int>(timeout * 1000));
-  }
-  /* If we could not connect to the socket, exit. */
-  if (fd == -1) { ARROW_LOG(FATAL) << "Could not connect to socket " << pathname; }
-  return fd;
-}
-
-int connect_ipc_sock(const std::string& pathname) {
-  struct sockaddr_un socket_address;
-  int socket_fd;
-
-  socket_fd = socket(AF_UNIX, SOCK_STREAM, 0);
-  if (socket_fd < 0) {
-    ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
-    return -1;
-  }
-
-  memset(&socket_address, 0, sizeof(socket_address));
-  socket_address.sun_family = AF_UNIX;
-  if (pathname.size() + 1 > sizeof(socket_address.sun_path)) {
-    ARROW_LOG(ERROR) << "Socket pathname is too long.";
-    return -1;
-  }
-  strncpy(socket_address.sun_path, pathname.c_str(), pathname.size() + 1);
-
-  if (connect(socket_fd, (struct sockaddr*)&socket_address, sizeof(socket_address)) !=
-      0) {
-    close(socket_fd);
-    return -1;
-  }
-
-  return socket_fd;
-}
-
-int AcceptClient(int socket_fd) {
-  int client_fd = accept(socket_fd, NULL, NULL);
-  if (client_fd < 0) {
-    ARROW_LOG(ERROR) << "Error reading from socket.";
-    return -1;
-  }
-  return client_fd;
-}
-
-uint8_t* read_message_async(int sock) {
-  int64_t size;
-  Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
-  if (!s.ok()) {
-    /* The other side has closed the socket. */
-    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
-    close(sock);
-    return NULL;
-  }
-  uint8_t* message = reinterpret_cast<uint8_t*>(malloc(size));
-  s = ReadBytes(sock, message, size);
-  if (!s.ok()) {
-    /* The other side has closed the socket. */
-    ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
-    close(sock);
-    return NULL;
-  }
-  return message;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
deleted file mode 100644
index 43c3fb5..0000000
--- a/cpp/src/plasma/io.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_IO_H
-#define PLASMA_IO_H
-
-#include <inttypes.h>
-#include <sys/socket.h>
-#include <sys/un.h>
-#include <unistd.h>
-
-#include <string>
-#include <vector>
-
-#include "arrow/status.h"
-
-// TODO(pcm): Replace our own custom message header (message type,
-// message length, plasma protocol verion) with one that is serialized
-// using flatbuffers.
-#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
-#define DISCONNECT_CLIENT 0
-
-arrow::Status WriteBytes(int fd, uint8_t* cursor, size_t length);
-
-arrow::Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes);
-
-arrow::Status ReadBytes(int fd, uint8_t* cursor, size_t length);
-
-arrow::Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer);
-
-int bind_ipc_sock(const std::string& pathname, bool shall_listen);
-
-int connect_ipc_sock(const std::string& pathname);
-
-int connect_ipc_sock_retry(const std::string& pathname, int num_retries, int64_t timeout);
-
-int AcceptClient(int socket_fd);
-
-uint8_t* read_message_async(int sock);
-
-#endif  // PLASMA_IO_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/malloc.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.cc b/cpp/src/plasma/malloc.cc
deleted file mode 100644
index 97c9a16..0000000
--- a/cpp/src/plasma/malloc.cc
+++ /dev/null
@@ -1,178 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/malloc.h"
-
-#include <assert.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <sys/mman.h>
-#include <unistd.h>
-
-#include <unordered_map>
-
-#include "plasma/common.h"
-
-extern "C" {
-void* fake_mmap(size_t);
-int fake_munmap(void*, int64_t);
-
-#define MMAP(s) fake_mmap(s)
-#define MUNMAP(a, s) fake_munmap(a, s)
-#define DIRECT_MMAP(s) fake_mmap(s)
-#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
-#define USE_DL_PREFIX
-#define HAVE_MORECORE 0
-#define DEFAULT_MMAP_THRESHOLD MAX_SIZE_T
-#define DEFAULT_GRANULARITY ((size_t)128U * 1024U)
-
-#include "thirdparty/dlmalloc.c"  // NOLINT
-
-#undef MMAP
-#undef MUNMAP
-#undef DIRECT_MMAP
-#undef DIRECT_MUNMAP
-#undef USE_DL_PREFIX
-#undef HAVE_MORECORE
-#undef DEFAULT_GRANULARITY
-}
-
-struct mmap_record {
-  int fd;
-  int64_t size;
-};
-
-namespace {
-
-/** Hashtable that contains one entry per segment that we got from the OS
- *  via mmap. Associates the address of that segment with its file descriptor
- *  and size. */
-std::unordered_map<void*, mmap_record> mmap_records;
-
-} /* namespace */
-
-constexpr int GRANULARITY_MULTIPLIER = 2;
-
-static void* pointer_advance(void* p, ptrdiff_t n) {
-  return (unsigned char*)p + n;
-}
-
-static void* pointer_retreat(void* p, ptrdiff_t n) {
-  return (unsigned char*)p - n;
-}
-
-static ptrdiff_t pointer_distance(void const* pfrom, void const* pto) {
-  return (unsigned char const*)pto - (unsigned char const*)pfrom;
-}
-
-/* Create a buffer. This is creating a temporary file and then
- * immediately unlinking it so we do not leave traces in the system. */
-int create_buffer(int64_t size) {
-  int fd;
-#ifdef _WIN32
-  if (!CreateFileMapping(INVALID_HANDLE_VALUE, NULL, PAGE_READWRITE,
-          (DWORD)((uint64_t)size >> (CHAR_BIT * sizeof(DWORD))), (DWORD)(uint64_t)size,
-          NULL)) {
-    fd = -1;
-  }
-#else
-#ifdef __linux__
-  constexpr char file_template[] = "/dev/shm/plasmaXXXXXX";
-#else
-  constexpr char file_template[] = "/tmp/plasmaXXXXXX";
-#endif
-  char file_name[32];
-  strncpy(file_name, file_template, 32);
-  fd = mkstemp(file_name);
-  if (fd < 0) return -1;
-  FILE* file = fdopen(fd, "a+");
-  if (!file) {
-    close(fd);
-    return -1;
-  }
-  if (unlink(file_name) != 0) {
-    ARROW_LOG(FATAL) << "unlink error";
-    return -1;
-  }
-  if (ftruncate(fd, (off_t)size) != 0) {
-    ARROW_LOG(FATAL) << "ftruncate error";
-    return -1;
-  }
-#endif
-  return fd;
-}
-
-void* fake_mmap(size_t size) {
-  /* Add sizeof(size_t) so that the returned pointer is deliberately not
-   * page-aligned. This ensures that the segments of memory returned by
-   * fake_mmap are never contiguous. */
-  size += sizeof(size_t);
-
-  int fd = create_buffer(size);
-  ARROW_CHECK(fd >= 0) << "Failed to create buffer during mmap";
-  void* pointer = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
-  if (pointer == MAP_FAILED) { return pointer; }
-
-  /* Increase dlmalloc's allocation granularity directly. */
-  mparams.granularity *= GRANULARITY_MULTIPLIER;
-
-  mmap_record& record = mmap_records[pointer];
-  record.fd = fd;
-  record.size = size;
-
-  /* We lie to dlmalloc about where mapped memory actually lives. */
-  pointer = pointer_advance(pointer, sizeof(size_t));
-  ARROW_LOG(DEBUG) << pointer << " = fake_mmap(" << size << ")";
-  return pointer;
-}
-
-int fake_munmap(void* addr, int64_t size) {
-  ARROW_LOG(DEBUG) << "fake_munmap(" << addr << ", " << size << ")";
-  addr = pointer_retreat(addr, sizeof(size_t));
-  size += sizeof(size_t);
-
-  auto entry = mmap_records.find(addr);
-
-  if (entry == mmap_records.end() || entry->second.size != size) {
-    /* Reject requests to munmap that don't directly match previous
-     * calls to mmap, to prevent dlmalloc from trimming. */
-    return -1;
-  }
-
-  int r = munmap(addr, size);
-  if (r == 0) { close(entry->second.fd); }
-
-  mmap_records.erase(entry);
-  return r;
-}
-
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_size, ptrdiff_t* offset) {
-  /* TODO(rshin): Implement a more efficient search through mmap_records. */
-  for (const auto& entry : mmap_records) {
-    if (addr >= entry.first && addr < pointer_advance(entry.first, entry.second.size)) {
-      *fd = entry.second.fd;
-      *map_size = entry.second.size;
-      *offset = pointer_distance(entry.first, addr);
-      return;
-    }
-  }
-  *fd = -1;
-  *map_size = 0;
-  *offset = 0;
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/malloc.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/malloc.h b/cpp/src/plasma/malloc.h
deleted file mode 100644
index b4af2c8..0000000
--- a/cpp/src/plasma/malloc.h
+++ /dev/null
@@ -1,26 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_MALLOC_H
-#define PLASMA_MALLOC_H
-
-#include <inttypes.h>
-#include <stddef.h>
-
-void get_malloc_mapinfo(void* addr, int* fd, int64_t* map_length, ptrdiff_t* offset);
-
-#endif  // MALLOC_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/plasma.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.cc b/cpp/src/plasma/plasma.cc
deleted file mode 100644
index 559d8e7..0000000
--- a/cpp/src/plasma/plasma.cc
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/plasma.h"
-
-#include <sys/socket.h>
-#include <sys/types.h>
-#include <unistd.h>
-
-#include "plasma/common.h"
-#include "plasma/protocol.h"
-
-int warn_if_sigpipe(int status, int client_sock) {
-  if (status >= 0) { return 0; }
-  if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {
-    ARROW_LOG(WARNING) << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when "
-                          "sending a message to client on fd "
-                       << client_sock << ". The client on the other end may "
-                                         "have hung up.";
-    return errno;
-  }
-  ARROW_LOG(FATAL) << "Failed to write message to client on fd " << client_sock << ".";
-  return -1;  // This is never reached.
-}
-
-/**
- * This will create a new ObjectInfo buffer. The first sizeof(int64_t) bytes
- * of this buffer are the length of the remaining message and the
- * remaining message is a serialized version of the object info.
- *
- * @param object_info The object info to be serialized
- * @return The object info buffer. It is the caller's responsibility to free
- *         this buffer with "delete" after it has been used.
- */
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreateObjectInfo(fbb, object_info);
-  fbb.Finish(message);
-  uint8_t* notification = new uint8_t[sizeof(int64_t) + fbb.GetSize()];
-  *(reinterpret_cast<int64_t*>(notification)) = fbb.GetSize();
-  memcpy(notification + sizeof(int64_t), fbb.GetBufferPointer(), fbb.GetSize());
-  return notification;
-}
-
-ObjectTableEntry* get_object_table_entry(
-    PlasmaStoreInfo* store_info, const ObjectID& object_id) {
-  auto it = store_info->objects.find(object_id);
-  if (it == store_info->objects.end()) { return NULL; }
-  return it->second.get();
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/plasma.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
deleted file mode 100644
index 275d0c7..0000000
--- a/cpp/src/plasma/plasma.h
+++ /dev/null
@@ -1,191 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_PLASMA_H
-#define PLASMA_PLASMA_H
-
-#include <errno.h>
-#include <inttypes.h>
-#include <stdbool.h>
-#include <stddef.h>
-#include <stdio.h>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>  // pid_t
-
-#include <unordered_map>
-#include <unordered_set>
-
-#include "arrow/status.h"
-#include "arrow/util/logging.h"
-#include "format/common_generated.h"
-#include "plasma/common.h"
-
-#define HANDLE_SIGPIPE(s, fd_)                                              \
-  do {                                                                      \
-    Status _s = (s);                                                        \
-    if (!_s.ok()) {                                                         \
-      if (errno == EPIPE || errno == EBADF || errno == ECONNRESET) {        \
-        ARROW_LOG(WARNING)                                                  \
-            << "Received SIGPIPE, BAD FILE DESCRIPTOR, or ECONNRESET when " \
-               "sending a message to client on fd "                         \
-            << fd_ << ". "                                                  \
-                      "The client on the other end may have hung up.";      \
-      } else {                                                              \
-        return _s;                                                          \
-      }                                                                     \
-    }                                                                       \
-  } while (0);
-
-/// Allocation granularity used in plasma for object allocation.
-#define BLOCK_SIZE 64
-
-/// Size of object hash digests.
-constexpr int64_t kDigestSize = sizeof(uint64_t);
-
-struct Client;
-
-/// Object request data structure. Used in the plasma_wait_for_objects()
-/// argument.
-typedef struct {
-  /// The ID of the requested object. If ID_NIL request any object.
-  ObjectID object_id;
-  /// Request associated to the object. It can take one of the following values:
-  ///  - PLASMA_QUERY_LOCAL: return if or when the object is available in the
-  ///    local Plasma Store.
-  ///  - PLASMA_QUERY_ANYWHERE: return if or when the object is available in
-  ///    the system (i.e., either in the local or a remote Plasma Store).
-  int type;
-  /// Object status. Same as the status returned by plasma_status() function
-  /// call. This is filled in by plasma_wait_for_objects1():
-  ///  - ObjectStatus_Local: object is ready at the local Plasma Store.
-  ///  - ObjectStatus_Remote: object is ready at a remote Plasma Store.
-  ///  - ObjectStatus_Nonexistent: object does not exist in the system.
-  ///  - PLASMA_CLIENT_IN_TRANSFER, if the object is currently being scheduled
-  ///    for being transferred or it is transferring.
-  int status;
-} ObjectRequest;
-
-/// Mapping from object IDs to type and status of the request.
-typedef std::unordered_map<ObjectID, ObjectRequest, UniqueIDHasher> ObjectRequestMap;
-
-/// Handle to access memory mapped file and map it into client address space.
-typedef struct {
-  /// The file descriptor of the memory mapped file in the store. It is used as
-  /// a unique identifier of the file in the client to look up the corresponding
-  /// file descriptor on the client's side.
-  int store_fd;
-  /// The size in bytes of the memory mapped file.
-  int64_t mmap_size;
-} object_handle;
-
-// TODO(pcm): Replace this by the flatbuffers message PlasmaObjectSpec.
-typedef struct {
-  /// Handle for memory mapped file the object is stored in.
-  object_handle handle;
-  /// The offset in bytes in the memory mapped file of the data.
-  ptrdiff_t data_offset;
-  /// The offset in bytes in the memory mapped file of the metadata.
-  ptrdiff_t metadata_offset;
-  /// The size in bytes of the data.
-  int64_t data_size;
-  /// The size in bytes of the metadata.
-  int64_t metadata_size;
-} PlasmaObject;
-
-typedef enum {
-  /// Object was created but not sealed in the local Plasma Store.
-  PLASMA_CREATED = 1,
-  /// Object is sealed and stored in the local Plasma Store.
-  PLASMA_SEALED
-} object_state;
-
-typedef enum {
-  /// The object was not found.
-  OBJECT_NOT_FOUND = 0,
-  /// The object was found.
-  OBJECT_FOUND = 1
-} object_status;
-
-typedef enum {
-  /// Query for object in the local plasma store.
-  PLASMA_QUERY_LOCAL = 1,
-  /// Query for object in the local plasma store or in a remote plasma store.
-  PLASMA_QUERY_ANYWHERE
-} object_request_type;
-
-/// This type is used by the Plasma store. It is here because it is exposed to
-/// the eviction policy.
-struct ObjectTableEntry {
-  /// Object id of this object.
-  ObjectID object_id;
-  /// Object info like size, creation time and owner.
-  ObjectInfoT info;
-  /// Memory mapped file containing the object.
-  int fd;
-  /// Size of the underlying map.
-  int64_t map_size;
-  /// Offset from the base of the mmap.
-  ptrdiff_t offset;
-  /// Pointer to the object data. Needed to free the object.
-  uint8_t* pointer;
-  /// Set of clients currently using this object.
-  std::unordered_set<Client*> clients;
-  /// The state of the object, e.g., whether it is open or sealed.
-  object_state state;
-  /// The digest of the object. Used to see if two objects are the same.
-  unsigned char digest[kDigestSize];
-};
-
-/// The plasma store information that is exposed to the eviction policy.
-struct PlasmaStoreInfo {
-  /// Objects that are in the Plasma store.
-  std::unordered_map<ObjectID, std::unique_ptr<ObjectTableEntry>, UniqueIDHasher> objects;
-  /// The amount of memory (in bytes) that we allow to be allocated in the
-  /// store.
-  int64_t memory_capacity;
-};
-
-/// Get an entry from the object table and return NULL if the object_id
-/// is not present.
-///
-/// @param store_info The PlasmaStoreInfo that contains the object table.
-/// @param object_id The object_id of the entry we are looking for.
-/// @return The entry associated with the object_id or NULL if the object_id
-///         is not present.
-ObjectTableEntry* get_object_table_entry(
-    PlasmaStoreInfo* store_info, const ObjectID& object_id);
-
-/// Print a warning if the status is less than zero. This should be used to check
-/// the success of messages sent to plasma clients. We print a warning instead of
-/// failing because the plasma clients are allowed to die. This is used to handle
-/// situations where the store writes to a client file descriptor, and the client
-/// may already have disconnected. If we have processed the disconnection and
-/// closed the file descriptor, we should get a BAD FILE DESCRIPTOR error. If we
-/// have not, then we should get a SIGPIPE. If we write to a TCP socket that
-/// isn't connected yet, then we should get an ECONNRESET.
-///
-/// @param status The status to check. If it is less less than zero, we will
-///        print a warning.
-/// @param client_sock The client socket. This is just used to print some extra
-///        information.
-/// @return The errno set.
-int warn_if_sigpipe(int status, int client_sock);
-
-uint8_t* create_object_info_buffer(ObjectInfoT* object_info);
-
-#endif  // PLASMA_PLASMA_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/protocol.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.cc b/cpp/src/plasma/protocol.cc
deleted file mode 100644
index 246aa29..0000000
--- a/cpp/src/plasma/protocol.cc
+++ /dev/null
@@ -1,502 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "plasma/protocol.h"
-
-#include "flatbuffers/flatbuffers.h"
-#include "format/plasma_generated.h"
-
-#include "plasma/common.h"
-#include "plasma/io.h"
-
-using flatbuffers::uoffset_t;
-
-flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<flatbuffers::String>>>
-to_flatbuffer(flatbuffers::FlatBufferBuilder* fbb, const ObjectID* object_ids,
-    int64_t num_objects) {
-  std::vector<flatbuffers::Offset<flatbuffers::String>> results;
-  for (int64_t i = 0; i < num_objects; i++) {
-    results.push_back(fbb->CreateString(object_ids[i].binary()));
-  }
-  return fbb->CreateVector(results);
-}
-
-Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer) {
-  int64_t type;
-  RETURN_NOT_OK(ReadMessage(sock, &type, buffer));
-  ARROW_CHECK(type == message_type) << "type = " << type
-                                    << ", message_type = " << message_type;
-  return Status::OK();
-}
-
-template <typename Message>
-Status PlasmaSend(int sock, int64_t message_type, flatbuffers::FlatBufferBuilder* fbb,
-    const Message& message) {
-  fbb->Finish(message);
-  return WriteMessage(sock, message_type, fbb->GetSize(), fbb->GetBufferPointer());
-}
-
-// Create messages.
-
-Status SendCreateRequest(
-    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaCreateRequest(
-      fbb, fbb.CreateString(object_id.binary()), data_size, metadata_size);
-  return PlasmaSend(sock, MessageType_PlasmaCreateRequest, &fbb, message);
-}
-
-Status ReadCreateRequest(
-    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaCreateRequest>(data);
-  *data_size = message->data_size();
-  *metadata_size = message->metadata_size();
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return Status::OK();
-}
-
-Status SendCreateReply(
-    int sock, ObjectID object_id, PlasmaObject* object, int error_code) {
-  flatbuffers::FlatBufferBuilder fbb;
-  PlasmaObjectSpec plasma_object(object->handle.store_fd, object->handle.mmap_size,
-      object->data_offset, object->data_size, object->metadata_offset,
-      object->metadata_size);
-  auto message = CreatePlasmaCreateReply(
-      fbb, fbb.CreateString(object_id.binary()), &plasma_object, (PlasmaError)error_code);
-  return PlasmaSend(sock, MessageType_PlasmaCreateReply, &fbb, message);
-}
-
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaCreateReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  object->handle.store_fd = message->plasma_object()->segment_index();
-  object->handle.mmap_size = message->plasma_object()->mmap_size();
-  object->data_offset = message->plasma_object()->data_offset();
-  object->data_size = message->plasma_object()->data_size();
-  object->metadata_offset = message->plasma_object()->metadata_offset();
-  object->metadata_size = message->plasma_object()->metadata_size();
-  return plasma_error_status(message->error());
-}
-
-// Seal messages.
-
-Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto digest_string = fbb.CreateString(reinterpret_cast<char*>(digest), kDigestSize);
-  auto message =
-      CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()), digest_string);
-  return PlasmaSend(sock, MessageType_PlasmaSealRequest, &fbb, message);
-}
-
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaSealRequest>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  ARROW_CHECK(message->digest()->size() == kDigestSize);
-  memcpy(digest, message->digest()->data(), kDigestSize);
-  return Status::OK();
-}
-
-Status SendSealReply(int sock, ObjectID object_id, int error) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaSealReply(
-      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
-  return PlasmaSend(sock, MessageType_PlasmaSealReply, &fbb, message);
-}
-
-Status ReadSealReply(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaSealReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return plasma_error_status(message->error());
-}
-
-// Release messages.
-
-Status SendReleaseRequest(int sock, ObjectID object_id) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaSealRequest(fbb, fbb.CreateString(object_id.binary()));
-  return PlasmaSend(sock, MessageType_PlasmaReleaseRequest, &fbb, message);
-}
-
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaReleaseRequest>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return Status::OK();
-}
-
-Status SendReleaseReply(int sock, ObjectID object_id, int error) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaReleaseReply(
-      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
-  return PlasmaSend(sock, MessageType_PlasmaReleaseReply, &fbb, message);
-}
-
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return plasma_error_status(message->error());
-}
-
-// Delete messages.
-
-Status SendDeleteRequest(int sock, ObjectID object_id) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaDeleteRequest(fbb, fbb.CreateString(object_id.binary()));
-  return PlasmaSend(sock, MessageType_PlasmaDeleteRequest, &fbb, message);
-}
-
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaReleaseReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return Status::OK();
-}
-
-Status SendDeleteReply(int sock, ObjectID object_id, int error) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaDeleteReply(
-      fbb, fbb.CreateString(object_id.binary()), (PlasmaError)error);
-  return PlasmaSend(sock, MessageType_PlasmaDeleteReply, &fbb, message);
-}
-
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaDeleteReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return plasma_error_status(message->error());
-}
-
-// Satus messages.
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      CreatePlasmaStatusRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
-  return PlasmaSend(sock, MessageType_PlasmaStatusRequest, &fbb, message);
-}
-
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaStatusRequest>(data);
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
-  }
-  return Status::OK();
-}
-
-Status SendStatusReply(
-    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      CreatePlasmaStatusReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
-          fbb.CreateVector(object_status, num_objects));
-  return PlasmaSend(sock, MessageType_PlasmaStatusReply, &fbb, message);
-}
-
-int64_t ReadStatusReply_num_objects(uint8_t* data) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
-  return message->object_ids()->size();
-}
-
-Status ReadStatusReply(
-    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaStatusReply>(data);
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
-  }
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_status[i] = message->status()->data()[i];
-  }
-  return Status::OK();
-}
-
-// Contains messages.
-
-Status SendContainsRequest(int sock, ObjectID object_id) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaContainsRequest(fbb, fbb.CreateString(object_id.binary()));
-  return PlasmaSend(sock, MessageType_PlasmaContainsRequest, &fbb, message);
-}
-
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaContainsRequest>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  return Status::OK();
-}
-
-Status SendContainsReply(int sock, ObjectID object_id, bool has_object) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      CreatePlasmaContainsReply(fbb, fbb.CreateString(object_id.binary()), has_object);
-  return PlasmaSend(sock, MessageType_PlasmaContainsReply, &fbb, message);
-}
-
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaContainsReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  *has_object = message->has_object();
-  return Status::OK();
-}
-
-// Connect messages.
-
-Status SendConnectRequest(int sock) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaConnectRequest(fbb);
-  return PlasmaSend(sock, MessageType_PlasmaConnectRequest, &fbb, message);
-}
-
-Status ReadConnectRequest(uint8_t* data) {
-  return Status::OK();
-}
-
-Status SendConnectReply(int sock, int64_t memory_capacity) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaConnectReply(fbb, memory_capacity);
-  return PlasmaSend(sock, MessageType_PlasmaConnectReply, &fbb, message);
-}
-
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaConnectReply>(data);
-  *memory_capacity = message->memory_capacity();
-  return Status::OK();
-}
-
-// Evict messages.
-
-Status SendEvictRequest(int sock, int64_t num_bytes) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaEvictRequest(fbb, num_bytes);
-  return PlasmaSend(sock, MessageType_PlasmaEvictRequest, &fbb, message);
-}
-
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaEvictRequest>(data);
-  *num_bytes = message->num_bytes();
-  return Status::OK();
-}
-
-Status SendEvictReply(int sock, int64_t num_bytes) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaEvictReply(fbb, num_bytes);
-  return PlasmaSend(sock, MessageType_PlasmaEvictReply, &fbb, message);
-}
-
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaEvictReply>(data);
-  num_bytes = message->num_bytes();
-  return Status::OK();
-}
-
-// Get messages.
-
-Status SendGetRequest(
-    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaGetRequest(
-      fbb, to_flatbuffer(&fbb, object_ids, num_objects), timeout_ms);
-  return PlasmaSend(sock, MessageType_PlasmaGetRequest, &fbb, message);
-}
-
-Status ReadGetRequest(
-    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaGetRequest>(data);
-  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
-    auto object_id = message->object_ids()->Get(i)->str();
-    object_ids.push_back(ObjectID::from_binary(object_id));
-  }
-  *timeout_ms = message->timeout_ms();
-  return Status::OK();
-}
-
-Status SendGetReply(int sock, ObjectID object_ids[],
-    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
-    int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  std::vector<PlasmaObjectSpec> objects;
-
-  for (int i = 0; i < num_objects; ++i) {
-    const PlasmaObject& object = plasma_objects[object_ids[i]];
-    objects.push_back(PlasmaObjectSpec(object.handle.store_fd, object.handle.mmap_size,
-        object.data_offset, object.data_size, object.metadata_offset,
-        object.metadata_size));
-  }
-  auto message = CreatePlasmaGetReply(fbb, to_flatbuffer(&fbb, object_ids, num_objects),
-      fbb.CreateVectorOfStructs(objects.data(), num_objects));
-  return PlasmaSend(sock, MessageType_PlasmaGetReply, &fbb, message);
-}
-
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
-    int64_t num_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaGetReply>(data);
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    object_ids[i] = ObjectID::from_binary(message->object_ids()->Get(i)->str());
-  }
-  for (uoffset_t i = 0; i < num_objects; ++i) {
-    const PlasmaObjectSpec* object = message->plasma_objects()->Get(i);
-    plasma_objects[i].handle.store_fd = object->segment_index();
-    plasma_objects[i].handle.mmap_size = object->mmap_size();
-    plasma_objects[i].data_offset = object->data_offset();
-    plasma_objects[i].data_size = object->data_size();
-    plasma_objects[i].metadata_offset = object->metadata_offset();
-    plasma_objects[i].metadata_size = object->metadata_size();
-  }
-  return Status::OK();
-}
-
-// Fetch messages.
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message =
-      CreatePlasmaFetchRequest(fbb, to_flatbuffer(&fbb, object_ids, num_objects));
-  return PlasmaSend(sock, MessageType_PlasmaFetchRequest, &fbb, message);
-}
-
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaFetchRequest>(data);
-  for (uoffset_t i = 0; i < message->object_ids()->size(); ++i) {
-    object_ids.push_back(ObjectID::from_binary(message->object_ids()->Get(i)->str()));
-  }
-  return Status::OK();
-}
-
-// Wait messages.
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
-    int num_ready_objects, int64_t timeout_ms) {
-  flatbuffers::FlatBufferBuilder fbb;
-
-  std::vector<flatbuffers::Offset<ObjectRequestSpec>> object_request_specs;
-  for (int i = 0; i < num_requests; i++) {
-    object_request_specs.push_back(CreateObjectRequestSpec(fbb,
-        fbb.CreateString(object_requests[i].object_id.binary()),
-        object_requests[i].type));
-  }
-
-  auto message = CreatePlasmaWaitRequest(
-      fbb, fbb.CreateVector(object_request_specs), num_ready_objects, timeout_ms);
-  return PlasmaSend(sock, MessageType_PlasmaWaitRequest, &fbb, message);
-}
-
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
-    int64_t* timeout_ms, int* num_ready_objects) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaWaitRequest>(data);
-  *num_ready_objects = message->num_ready_objects();
-  *timeout_ms = message->timeout();
-
-  for (uoffset_t i = 0; i < message->object_requests()->size(); i++) {
-    ObjectID object_id =
-        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
-    ObjectRequest object_request({object_id, message->object_requests()->Get(i)->type(),
-        ObjectStatus_Nonexistent});
-    object_requests[object_id] = object_request;
-  }
-  return Status::OK();
-}
-
-Status SendWaitReply(
-    int sock, const ObjectRequestMap& object_requests, int num_ready_objects) {
-  flatbuffers::FlatBufferBuilder fbb;
-
-  std::vector<flatbuffers::Offset<ObjectReply>> object_replies;
-  for (const auto& entry : object_requests) {
-    const auto& object_request = entry.second;
-    object_replies.push_back(CreateObjectReply(
-        fbb, fbb.CreateString(object_request.object_id.binary()), object_request.status));
-  }
-
-  auto message = CreatePlasmaWaitReply(
-      fbb, fbb.CreateVector(object_replies.data(), num_ready_objects), num_ready_objects);
-  return PlasmaSend(sock, MessageType_PlasmaWaitReply, &fbb, message);
-}
-
-Status ReadWaitReply(
-    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects) {
-  DCHECK(data);
-
-  auto message = flatbuffers::GetRoot<PlasmaWaitReply>(data);
-  *num_ready_objects = message->num_ready_objects();
-  for (int i = 0; i < *num_ready_objects; i++) {
-    object_requests[i].object_id =
-        ObjectID::from_binary(message->object_requests()->Get(i)->object_id()->str());
-    object_requests[i].status = message->object_requests()->Get(i)->status();
-  }
-  return Status::OK();
-}
-
-// Subscribe messages.
-
-Status SendSubscribeRequest(int sock) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaSubscribeRequest(fbb);
-  return PlasmaSend(sock, MessageType_PlasmaSubscribeRequest, &fbb, message);
-}
-
-// Data messages.
-
-Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto addr = fbb.CreateString(address, strlen(address));
-  auto message =
-      CreatePlasmaDataRequest(fbb, fbb.CreateString(object_id.binary()), addr, port);
-  return PlasmaSend(sock, MessageType_PlasmaDataRequest, &fbb, message);
-}
-
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaDataRequest>(data);
-  DCHECK(message->object_id()->size() == sizeof(ObjectID));
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  *address = strdup(message->address()->c_str());
-  *port = message->port();
-  return Status::OK();
-}
-
-Status SendDataReply(
-    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size) {
-  flatbuffers::FlatBufferBuilder fbb;
-  auto message = CreatePlasmaDataReply(
-      fbb, fbb.CreateString(object_id.binary()), object_size, metadata_size);
-  return PlasmaSend(sock, MessageType_PlasmaDataReply, &fbb, message);
-}
-
-Status ReadDataReply(
-    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size) {
-  DCHECK(data);
-  auto message = flatbuffers::GetRoot<PlasmaDataReply>(data);
-  *object_id = ObjectID::from_binary(message->object_id()->str());
-  *object_size = (int64_t)message->object_size();
-  *metadata_size = (int64_t)message->metadata_size();
-  return Status::OK();
-}

http://git-wip-us.apache.org/repos/asf/arrow/blob/62ef2cd8/cpp/src/plasma/protocol.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/protocol.h b/cpp/src/plasma/protocol.h
deleted file mode 100644
index 5d9d136..0000000
--- a/cpp/src/plasma/protocol.h
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PLASMA_PROTOCOL_H
-#define PLASMA_PROTOCOL_H
-
-#include <vector>
-
-#include "arrow/status.h"
-#include "format/plasma_generated.h"
-#include "plasma/plasma.h"
-
-using arrow::Status;
-
-/* Plasma receive message. */
-
-Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer);
-
-/* Plasma Create message functions. */
-
-Status SendCreateRequest(
-    int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size);
-
-Status ReadCreateRequest(
-    uint8_t* data, ObjectID* object_id, int64_t* data_size, int64_t* metadata_size);
-
-Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error);
-
-Status ReadCreateReply(uint8_t* data, ObjectID* object_id, PlasmaObject* object);
-
-/* Plasma Seal message functions. */
-
-Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
-
-Status ReadSealRequest(uint8_t* data, ObjectID* object_id, unsigned char* digest);
-
-Status SendSealReply(int sock, ObjectID object_id, int error);
-
-Status ReadSealReply(uint8_t* data, ObjectID* object_id);
-
-/* Plasma Get message functions. */
-
-Status SendGetRequest(
-    int sock, const ObjectID* object_ids, int64_t num_objects, int64_t timeout_ms);
-
-Status ReadGetRequest(
-    uint8_t* data, std::vector<ObjectID>& object_ids, int64_t* timeout_ms);
-
-Status SendGetReply(int sock, ObjectID object_ids[],
-    std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
-    int64_t num_objects);
-
-Status ReadGetReply(uint8_t* data, ObjectID object_ids[], PlasmaObject plasma_objects[],
-    int64_t num_objects);
-
-/* Plasma Release message functions. */
-
-Status SendReleaseRequest(int sock, ObjectID object_id);
-
-Status ReadReleaseRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendReleaseReply(int sock, ObjectID object_id, int error);
-
-Status ReadReleaseReply(uint8_t* data, ObjectID* object_id);
-
-/* Plasma Delete message functions. */
-
-Status SendDeleteRequest(int sock, ObjectID object_id);
-
-Status ReadDeleteRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendDeleteReply(int sock, ObjectID object_id, int error);
-
-Status ReadDeleteReply(uint8_t* data, ObjectID* object_id);
-
-/* Satus messages. */
-
-Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadStatusRequest(uint8_t* data, ObjectID object_ids[], int64_t num_objects);
-
-Status SendStatusReply(
-    int sock, ObjectID object_ids[], int object_status[], int64_t num_objects);
-
-int64_t ReadStatusReply_num_objects(uint8_t* data);
-
-Status ReadStatusReply(
-    uint8_t* data, ObjectID object_ids[], int object_status[], int64_t num_objects);
-
-/* Plasma Constains message functions. */
-
-Status SendContainsRequest(int sock, ObjectID object_id);
-
-Status ReadContainsRequest(uint8_t* data, ObjectID* object_id);
-
-Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
-
-Status ReadContainsReply(uint8_t* data, ObjectID* object_id, bool* has_object);
-
-/* Plasma Connect message functions. */
-
-Status SendConnectRequest(int sock);
-
-Status ReadConnectRequest(uint8_t* data);
-
-Status SendConnectReply(int sock, int64_t memory_capacity);
-
-Status ReadConnectReply(uint8_t* data, int64_t* memory_capacity);
-
-/* Plasma Evict message functions (no reply so far). */
-
-Status SendEvictRequest(int sock, int64_t num_bytes);
-
-Status ReadEvictRequest(uint8_t* data, int64_t* num_bytes);
-
-Status SendEvictReply(int sock, int64_t num_bytes);
-
-Status ReadEvictReply(uint8_t* data, int64_t& num_bytes);
-
-/* Plasma Fetch Remote message functions. */
-
-Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
-
-Status ReadFetchRequest(uint8_t* data, std::vector<ObjectID>& object_ids);
-
-/* Plasma Wait message functions. */
-
-Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
-    int num_ready_objects, int64_t timeout_ms);
-
-Status ReadWaitRequest(uint8_t* data, ObjectRequestMap& object_requests,
-    int64_t* timeout_ms, int* num_ready_objects);
-
-Status SendWaitReply(
-    int sock, const ObjectRequestMap& object_requests, int num_ready_objects);
-
-Status ReadWaitReply(
-    uint8_t* data, ObjectRequest object_requests[], int* num_ready_objects);
-
-/* Plasma Subscribe message functions. */
-
-Status SendSubscribeRequest(int sock);
-
-/* Data messages. */
-
-Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
-
-Status ReadDataRequest(uint8_t* data, ObjectID* object_id, char** address, int* port);
-
-Status SendDataReply(
-    int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size);
-
-Status ReadDataReply(
-    uint8_t* data, ObjectID* object_id, int64_t* object_size, int64_t* metadata_size);
-
-#endif /* PLASMA_PROTOCOL */