You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2017/07/23 18:39:19 UTC
[12/14] arrow git commit: [C++] Restore Plasma source tree after
0.5.0 release
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/store.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
new file mode 100644
index 0000000..9394e3d
--- /dev/null
+++ b/cpp/src/plasma/store.cc
@@ -0,0 +1,683 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// PLASMA STORE: This is a simple object store server process
+//
+// It accepts incoming client connections on a unix domain socket
+// (name passed in via the -s option of the executable) and uses a
+// single thread to serve the clients. Each client establishes a
+// connection and can create objects, wait for objects and seal
+// objects through that connection.
+//
+// It keeps a hash table that maps object_ids (which are 20 byte long,
+// just enough to store and SHA1 hash) to memory mapped files.
+
+#include "plasma/store.h"
+
+#include <assert.h>
+#include <fcntl.h>
+#include <getopt.h>
+#include <limits.h>
+#include <signal.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <sys/ioctl.h>
+#include <sys/socket.h>
+#include <sys/statvfs.h>
+#include <sys/types.h>
+#include <sys/un.h>
+#include <unistd.h>
+
+#include <deque>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "format/common_generated.h"
+#include "plasma/common.h"
+#include "plasma/fling.h"
+#include "plasma/io.h"
+#include "plasma/malloc.h"
+
+extern "C" {
+void* dlmalloc(size_t bytes);
+void* dlmemalign(size_t alignment, size_t bytes);
+void dlfree(void* mem);
+size_t dlmalloc_set_footprint_limit(size_t bytes);
+}
+
+struct GetRequest {
+ GetRequest(Client* client, const std::vector<ObjectID>& object_ids);
+ /// The client that called get.
+ Client* client;
+ /// The ID of the timer that will time out and cause this wait to return to
+ /// the client if it hasn't already returned.
+ int64_t timer;
+ /// The object IDs involved in this request. This is used in the reply.
+ std::vector<ObjectID> object_ids;
+ /// The object information for the objects in this request. This is used in
+ /// the reply.
+ std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> objects;
+ /// The minimum number of objects to wait for in this request.
+ int64_t num_objects_to_wait_for;
+ /// The number of object requests in this wait request that are already
+ /// satisfied.
+ int64_t num_satisfied;
+};
+
+GetRequest::GetRequest(Client* client, const std::vector<ObjectID>& object_ids)
+ : client(client),
+ timer(-1),
+ object_ids(object_ids.begin(), object_ids.end()),
+ objects(object_ids.size()),
+ num_satisfied(0) {
+ std::unordered_set<ObjectID, UniqueIDHasher> unique_ids(
+ object_ids.begin(), object_ids.end());
+ num_objects_to_wait_for = unique_ids.size();
+}
+
+Client::Client(int fd) : fd(fd) {}
+
+PlasmaStore::PlasmaStore(EventLoop* loop, int64_t system_memory)
+ : loop_(loop), eviction_policy_(&store_info_) {
+ store_info_.memory_capacity = system_memory;
+}
+
+// TODO(pcm): Get rid of this destructor by using RAII to clean up data.
+PlasmaStore::~PlasmaStore() {
+ for (const auto& element : pending_notifications_) {
+ auto object_notifications = element.second.object_notifications;
+ for (size_t i = 0; i < object_notifications.size(); ++i) {
+ uint8_t* notification = reinterpret_cast<uint8_t*>(object_notifications.at(i));
+ uint8_t* data = notification;
+ // TODO(pcm): Get rid of this delete.
+ delete[] data;
+ }
+ }
+}
+
+// If this client is not already using the object, add the client to the
+// object's list of clients, otherwise do nothing.
+void PlasmaStore::add_client_to_object_clients(ObjectTableEntry* entry, Client* client) {
+ // Check if this client is already using the object.
+ if (entry->clients.find(client) != entry->clients.end()) { return; }
+ // If there are no other clients using this object, notify the eviction policy
+ // that the object is being used.
+ if (entry->clients.size() == 0) {
+ // Tell the eviction policy that this object is being used.
+ std::vector<ObjectID> objects_to_evict;
+ eviction_policy_.begin_object_access(entry->object_id, &objects_to_evict);
+ delete_objects(objects_to_evict);
+ }
+ // Add the client pointer to the list of clients using this object.
+ entry->clients.insert(client);
+}
+
+// Create a new object buffer in the hash table.
+int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
+ int64_t metadata_size, Client* client, PlasmaObject* result) {
+ ARROW_LOG(DEBUG) << "creating object " << object_id.hex();
+ if (store_info_.objects.count(object_id) != 0) {
+ // There is already an object with the same ID in the Plasma Store, so
+ // ignore this requst.
+ return PlasmaError_ObjectExists;
+ }
+ // Try to evict objects until there is enough space.
+ uint8_t* pointer;
+ do {
+ // Allocate space for the new object. We use dlmemalign instead of dlmalloc
+ // in order to align the allocated region to a 64-byte boundary. This is not
+ // strictly necessary, but it is an optimization that could speed up the
+ // computation of a hash of the data (see compute_object_hash_parallel in
+ // plasma_client.cc). Note that even though this pointer is 64-byte aligned,
+ // it is not guaranteed that the corresponding pointer in the client will be
+ // 64-byte aligned, but in practice it often will be.
+ pointer =
+ reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
+ if (pointer == NULL) {
+ // Tell the eviction policy how much space we need to create this object.
+ std::vector<ObjectID> objects_to_evict;
+ bool success =
+ eviction_policy_.require_space(data_size + metadata_size, &objects_to_evict);
+ delete_objects(objects_to_evict);
+ // Return an error to the client if not enough space could be freed to
+ // create the object.
+ if (!success) { return PlasmaError_OutOfMemory; }
+ }
+ } while (pointer == NULL);
+ int fd;
+ int64_t map_size;
+ ptrdiff_t offset;
+ get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
+ assert(fd != -1);
+
+ auto entry = std::unique_ptr<ObjectTableEntry>(new ObjectTableEntry());
+ entry->object_id = object_id;
+ entry->info.object_id = object_id.binary();
+ entry->info.data_size = data_size;
+ entry->info.metadata_size = metadata_size;
+ entry->pointer = pointer;
+ // TODO(pcm): Set the other fields.
+ entry->fd = fd;
+ entry->map_size = map_size;
+ entry->offset = offset;
+ entry->state = PLASMA_CREATED;
+
+ store_info_.objects[object_id] = std::move(entry);
+ result->handle.store_fd = fd;
+ result->handle.mmap_size = map_size;
+ result->data_offset = offset;
+ result->metadata_offset = offset + data_size;
+ result->data_size = data_size;
+ result->metadata_size = metadata_size;
+ // Notify the eviction policy that this object was created. This must be done
+ // immediately before the call to add_client_to_object_clients so that the
+ // eviction policy does not have an opportunity to evict the object.
+ eviction_policy_.object_created(object_id);
+ // Record that this client is using this object.
+ add_client_to_object_clients(store_info_.objects[object_id].get(), client);
+ return PlasmaError_OK;
+}
+
+void PlasmaObject_init(PlasmaObject* object, ObjectTableEntry* entry) {
+ DCHECK(object != NULL);
+ DCHECK(entry != NULL);
+ DCHECK(entry->state == PLASMA_SEALED);
+ object->handle.store_fd = entry->fd;
+ object->handle.mmap_size = entry->map_size;
+ object->data_offset = entry->offset;
+ object->metadata_offset = entry->offset + entry->info.data_size;
+ object->data_size = entry->info.data_size;
+ object->metadata_size = entry->info.metadata_size;
+}
+
+void PlasmaStore::return_from_get(GetRequest* get_req) {
+ // Send the get reply to the client.
+ Status s = SendGetReply(get_req->client->fd, &get_req->object_ids[0], get_req->objects,
+ get_req->object_ids.size());
+ warn_if_sigpipe(s.ok() ? 0 : -1, get_req->client->fd);
+ // If we successfully sent the get reply message to the client, then also send
+ // the file descriptors.
+ if (s.ok()) {
+ // Send all of the file descriptors for the present objects.
+ for (const auto& object_id : get_req->object_ids) {
+ PlasmaObject& object = get_req->objects[object_id];
+ // We use the data size to indicate whether the object is present or not.
+ if (object.data_size != -1) {
+ int error_code = send_fd(get_req->client->fd, object.handle.store_fd);
+ // If we failed to send the file descriptor, loop until we have sent it
+ // successfully. TODO(rkn): This is problematic for two reasons. First
+ // of all, sending the file descriptor should just succeed without any
+ // errors, but sometimes I see a "Message too long" error number.
+ // Second, looping like this allows a client to potentially block the
+ // plasma store event loop which should never happen.
+ while (error_code < 0) {
+ if (errno == EMSGSIZE) {
+ ARROW_LOG(WARNING) << "Failed to send file descriptor, retrying.";
+ error_code = send_fd(get_req->client->fd, object.handle.store_fd);
+ continue;
+ }
+ warn_if_sigpipe(error_code, get_req->client->fd);
+ break;
+ }
+ }
+ }
+ }
+
+ // Remove the get request from each of the relevant object_get_requests hash
+ // tables if it is present there. It should only be present there if the get
+ // request timed out.
+ for (ObjectID& object_id : get_req->object_ids) {
+ auto& get_requests = object_get_requests_[object_id];
+ // Erase get_req from the vector.
+ auto it = std::find(get_requests.begin(), get_requests.end(), get_req);
+ if (it != get_requests.end()) { get_requests.erase(it); }
+ }
+ // Remove the get request.
+ if (get_req->timer != -1) { ARROW_CHECK(loop_->remove_timer(get_req->timer) == AE_OK); }
+ delete get_req;
+}
+
+void PlasmaStore::update_object_get_requests(const ObjectID& object_id) {
+ std::vector<GetRequest*>& get_requests = object_get_requests_[object_id];
+ size_t index = 0;
+ size_t num_requests = get_requests.size();
+ for (size_t i = 0; i < num_requests; ++i) {
+ GetRequest* get_req = get_requests[index];
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ ARROW_CHECK(entry != NULL);
+
+ PlasmaObject_init(&get_req->objects[object_id], entry);
+ get_req->num_satisfied += 1;
+ // Record the fact that this client will be using this object and will
+ // be responsible for releasing this object.
+ add_client_to_object_clients(entry, get_req->client);
+
+ // If this get request is done, reply to the client.
+ if (get_req->num_satisfied == get_req->num_objects_to_wait_for) {
+ return_from_get(get_req);
+ } else {
+ // The call to return_from_get will remove the current element in the
+ // array, so we only increment the counter in the else branch.
+ index += 1;
+ }
+ }
+
+ DCHECK(index == get_requests.size());
+ // Remove the array of get requests for this object, since no one should be
+ // waiting for this object anymore.
+ object_get_requests_.erase(object_id);
+}
+
+void PlasmaStore::process_get_request(
+ Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms) {
+ // Create a get request for this object.
+ GetRequest* get_req = new GetRequest(client, object_ids);
+
+ for (auto object_id : object_ids) {
+ // Check if this object is already present locally. If so, record that the
+ // object is being used and mark it as accounted for.
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ if (entry && entry->state == PLASMA_SEALED) {
+ // Update the get request to take into account the present object.
+ PlasmaObject_init(&get_req->objects[object_id], entry);
+ get_req->num_satisfied += 1;
+ // If necessary, record that this client is using this object. In the case
+ // where entry == NULL, this will be called from seal_object.
+ add_client_to_object_clients(entry, client);
+ } else {
+ // Add a placeholder plasma object to the get request to indicate that the
+ // object is not present. This will be parsed by the client. We set the
+ // data size to -1 to indicate that the object is not present.
+ get_req->objects[object_id].data_size = -1;
+ // Add the get request to the relevant data structures.
+ object_get_requests_[object_id].push_back(get_req);
+ }
+ }
+
+ // If all of the objects are present already or if the timeout is 0, return to
+ // the client.
+ if (get_req->num_satisfied == get_req->num_objects_to_wait_for || timeout_ms == 0) {
+ return_from_get(get_req);
+ } else if (timeout_ms != -1) {
+ // Set a timer that will cause the get request to return to the client. Note
+ // that a timeout of -1 is used to indicate that no timer should be set.
+ get_req->timer = loop_->add_timer(timeout_ms, [this, get_req](int64_t timer_id) {
+ return_from_get(get_req);
+ return kEventLoopTimerDone;
+ });
+ }
+}
+
+int PlasmaStore::remove_client_from_object_clients(
+ ObjectTableEntry* entry, Client* client) {
+ auto it = entry->clients.find(client);
+ if (it != entry->clients.end()) {
+ entry->clients.erase(it);
+ // If no more clients are using this object, notify the eviction policy
+ // that the object is no longer being used.
+ if (entry->clients.size() == 0) {
+ // Tell the eviction policy that this object is no longer being used.
+ std::vector<ObjectID> objects_to_evict;
+ eviction_policy_.end_object_access(entry->object_id, &objects_to_evict);
+ delete_objects(objects_to_evict);
+ }
+ // Return 1 to indicate that the client was removed.
+ return 1;
+ } else {
+ // Return 0 to indicate that the client was not removed.
+ return 0;
+ }
+}
+
+void PlasmaStore::release_object(const ObjectID& object_id, Client* client) {
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ ARROW_CHECK(entry != NULL);
+ // Remove the client from the object's array of clients.
+ ARROW_CHECK(remove_client_from_object_clients(entry, client) == 1);
+}
+
+// Check if an object is present.
+int PlasmaStore::contains_object(const ObjectID& object_id) {
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ return entry && (entry->state == PLASMA_SEALED) ? OBJECT_FOUND : OBJECT_NOT_FOUND;
+}
+
+// Seal an object that has been created in the hash table.
+void PlasmaStore::seal_object(const ObjectID& object_id, unsigned char digest[]) {
+ ARROW_LOG(DEBUG) << "sealing object " << object_id.hex();
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ ARROW_CHECK(entry != NULL);
+ ARROW_CHECK(entry->state == PLASMA_CREATED);
+ // Set the state of object to SEALED.
+ entry->state = PLASMA_SEALED;
+ // Set the object digest.
+ entry->info.digest = std::string(reinterpret_cast<char*>(&digest[0]), kDigestSize);
+ // Inform all subscribers that a new object has been sealed.
+ push_notification(&entry->info);
+
+ // Update all get requests that involve this object.
+ update_object_get_requests(object_id);
+}
+
+void PlasmaStore::delete_objects(const std::vector<ObjectID>& object_ids) {
+ for (const auto& object_id : object_ids) {
+ ARROW_LOG(DEBUG) << "deleting object " << object_id.hex();
+ auto entry = get_object_table_entry(&store_info_, object_id);
+ // TODO(rkn): This should probably not fail, but should instead throw an
+ // error. Maybe we should also support deleting objects that have been
+ // created but not sealed.
+ ARROW_CHECK(entry != NULL) << "To delete an object it must be in the object table.";
+ ARROW_CHECK(entry->state == PLASMA_SEALED)
+ << "To delete an object it must have been sealed.";
+ ARROW_CHECK(entry->clients.size() == 0)
+ << "To delete an object, there must be no clients currently using it.";
+ dlfree(entry->pointer);
+ store_info_.objects.erase(object_id);
+ // Inform all subscribers that the object has been deleted.
+ ObjectInfoT notification;
+ notification.object_id = object_id.binary();
+ notification.is_deletion = true;
+ push_notification(¬ification);
+ }
+}
+
+void PlasmaStore::connect_client(int listener_sock) {
+ int client_fd = AcceptClient(listener_sock);
+ // This is freed in disconnect_client.
+ Client* client = new Client(client_fd);
+ // Add a callback to handle events on this socket.
+ // TODO(pcm): Check return value.
+ loop_->add_file_event(client_fd, kEventLoopRead, [this, client](int events) {
+ Status s = process_message(client);
+ if (!s.ok()) { ARROW_LOG(FATAL) << "Failed to process file event: " << s; }
+ });
+ ARROW_LOG(DEBUG) << "New connection with fd " << client_fd;
+}
+
+void PlasmaStore::disconnect_client(Client* client) {
+ ARROW_CHECK(client != NULL);
+ ARROW_CHECK(client->fd > 0);
+ loop_->remove_file_event(client->fd);
+ // Close the socket.
+ close(client->fd);
+ ARROW_LOG(INFO) << "Disconnecting client on fd " << client->fd;
+ // If this client was using any objects, remove it from the appropriate
+ // lists.
+ for (const auto& entry : store_info_.objects) {
+ remove_client_from_object_clients(entry.second.get(), client);
+ }
+ // Note, the store may still attempt to send a message to the disconnected
+ // client (for example, when an object ID that the client was waiting for
+ // is ready). In these cases, the attempt to send the message will fail, but
+ // the store should just ignore the failure.
+ delete client;
+}
+
+/// Send notifications about sealed objects to the subscribers. This is called
+/// in seal_object. If the socket's send buffer is full, the notification will
+/// be
+/// buffered, and this will be called again when the send buffer has room.
+///
+/// @param client The client to send the notification to.
+/// @return Void.
+void PlasmaStore::send_notifications(int client_fd) {
+ auto it = pending_notifications_.find(client_fd);
+
+ int num_processed = 0;
+ bool closed = false;
+ // Loop over the array of pending notifications and send as many of them as
+ // possible.
+ for (size_t i = 0; i < it->second.object_notifications.size(); ++i) {
+ uint8_t* notification =
+ reinterpret_cast<uint8_t*>(it->second.object_notifications.at(i));
+ // Decode the length, which is the first bytes of the message.
+ int64_t size = *(reinterpret_cast<int64_t*>(notification));
+
+ // Attempt to send a notification about this object ID.
+ ssize_t nbytes = send(client_fd, notification, sizeof(int64_t) + size, 0);
+ if (nbytes >= 0) {
+ ARROW_CHECK(nbytes == static_cast<ssize_t>(sizeof(int64_t)) + size);
+ } else if (nbytes == -1 &&
+ (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) {
+ ARROW_LOG(DEBUG) << "The socket's send buffer is full, so we are caching this "
+ "notification and will send it later.";
+ // Add a callback to the event loop to send queued notifications whenever
+ // there is room in the socket's send buffer. Callbacks can be added
+ // more than once here and will be overwritten. The callback is removed
+ // at the end of the method.
+ // TODO(pcm): Introduce status codes and check in case the file descriptor
+ // is added twice.
+ loop_->add_file_event(client_fd, kEventLoopWrite,
+ [this, client_fd](int events) { send_notifications(client_fd); });
+ break;
+ } else {
+ ARROW_LOG(WARNING) << "Failed to send notification to client on fd " << client_fd;
+ if (errno == EPIPE) {
+ closed = true;
+ break;
+ }
+ }
+ num_processed += 1;
+ // The corresponding malloc happened in create_object_info_buffer
+ // within push_notification.
+ delete[] notification;
+ }
+ // Remove the sent notifications from the array.
+ it->second.object_notifications.erase(it->second.object_notifications.begin(),
+ it->second.object_notifications.begin() + num_processed);
+
+ // Stop sending notifications if the pipe was broken.
+ if (closed) {
+ close(client_fd);
+ pending_notifications_.erase(client_fd);
+ }
+
+ // If we have sent all notifications, remove the fd from the event loop.
+ if (it->second.object_notifications.empty()) { loop_->remove_file_event(client_fd); }
+}
+
+void PlasmaStore::push_notification(ObjectInfoT* object_info) {
+ for (auto& element : pending_notifications_) {
+ uint8_t* notification = create_object_info_buffer(object_info);
+ element.second.object_notifications.push_back(notification);
+ send_notifications(element.first);
+ // The notification gets freed in send_notifications when the notification
+ // is sent over the socket.
+ }
+}
+
+// Subscribe to notifications about sealed objects.
+void PlasmaStore::subscribe_to_updates(Client* client) {
+ ARROW_LOG(DEBUG) << "subscribing to updates on fd " << client->fd;
+ // TODO(rkn): The store could block here if the client doesn't send a file
+ // descriptor.
+ int fd = recv_fd(client->fd);
+ if (fd < 0) {
+ // This may mean that the client died before sending the file descriptor.
+ ARROW_LOG(WARNING) << "Failed to receive file descriptor from client on fd "
+ << client->fd << ".";
+ return;
+ }
+
+ // Create a new array to buffer notifications that can't be sent to the
+ // subscriber yet because the socket send buffer is full. TODO(rkn): the queue
+ // never gets freed.
+ // TODO(pcm): Is the following neccessary?
+ pending_notifications_[fd];
+
+ // Push notifications to the new subscriber about existing objects.
+ for (const auto& entry : store_info_.objects) {
+ push_notification(&entry.second->info);
+ }
+ send_notifications(fd);
+}
+
+Status PlasmaStore::process_message(Client* client) {
+ int64_t type;
+ Status s = ReadMessage(client->fd, &type, &input_buffer_);
+ ARROW_CHECK(s.ok() || s.IsIOError());
+
+ uint8_t* input = input_buffer_.data();
+ ObjectID object_id;
+ PlasmaObject object;
+ // TODO(pcm): Get rid of the following.
+ memset(&object, 0, sizeof(object));
+
+ // Process the different types of requests.
+ switch (type) {
+ case MessageType_PlasmaCreateRequest: {
+ int64_t data_size;
+ int64_t metadata_size;
+ RETURN_NOT_OK(ReadCreateRequest(input, &object_id, &data_size, &metadata_size));
+ int error_code =
+ create_object(object_id, data_size, metadata_size, client, &object);
+ HANDLE_SIGPIPE(
+ SendCreateReply(client->fd, object_id, &object, error_code), client->fd);
+ if (error_code == PlasmaError_OK) {
+ warn_if_sigpipe(send_fd(client->fd, object.handle.store_fd), client->fd);
+ }
+ } break;
+ case MessageType_PlasmaGetRequest: {
+ std::vector<ObjectID> object_ids_to_get;
+ int64_t timeout_ms;
+ RETURN_NOT_OK(ReadGetRequest(input, object_ids_to_get, &timeout_ms));
+ process_get_request(client, object_ids_to_get, timeout_ms);
+ } break;
+ case MessageType_PlasmaReleaseRequest:
+ RETURN_NOT_OK(ReadReleaseRequest(input, &object_id));
+ release_object(object_id, client);
+ break;
+ case MessageType_PlasmaContainsRequest:
+ RETURN_NOT_OK(ReadContainsRequest(input, &object_id));
+ if (contains_object(object_id) == OBJECT_FOUND) {
+ HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 1), client->fd);
+ } else {
+ HANDLE_SIGPIPE(SendContainsReply(client->fd, object_id, 0), client->fd);
+ }
+ break;
+ case MessageType_PlasmaSealRequest: {
+ unsigned char digest[kDigestSize];
+ RETURN_NOT_OK(ReadSealRequest(input, &object_id, &digest[0]));
+ seal_object(object_id, &digest[0]);
+ } break;
+ case MessageType_PlasmaEvictRequest: {
+ // This code path should only be used for testing.
+ int64_t num_bytes;
+ RETURN_NOT_OK(ReadEvictRequest(input, &num_bytes));
+ std::vector<ObjectID> objects_to_evict;
+ int64_t num_bytes_evicted =
+ eviction_policy_.choose_objects_to_evict(num_bytes, &objects_to_evict);
+ delete_objects(objects_to_evict);
+ HANDLE_SIGPIPE(SendEvictReply(client->fd, num_bytes_evicted), client->fd);
+ } break;
+ case MessageType_PlasmaSubscribeRequest:
+ subscribe_to_updates(client);
+ break;
+ case MessageType_PlasmaConnectRequest: {
+ HANDLE_SIGPIPE(
+ SendConnectReply(client->fd, store_info_.memory_capacity), client->fd);
+ } break;
+ case DISCONNECT_CLIENT:
+ ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
+ disconnect_client(client);
+ break;
+ default:
+ // This code should be unreachable.
+ ARROW_CHECK(0);
+ }
+ return Status::OK();
+}
+
+// Report "success" to valgrind.
+void signal_handler(int signal) {
+ if (signal == SIGTERM) { exit(0); }
+}
+
+void start_server(char* socket_name, int64_t system_memory) {
+ // Ignore SIGPIPE signals. If we don't do this, then when we attempt to write
+ // to a client that has already died, the store could die.
+ signal(SIGPIPE, SIG_IGN);
+ // Create the event loop.
+ EventLoop loop;
+ PlasmaStore store(&loop, system_memory);
+ int socket = bind_ipc_sock(socket_name, true);
+ ARROW_CHECK(socket >= 0);
+ // TODO(pcm): Check return value.
+ loop.add_file_event(socket, kEventLoopRead,
+ [&store, socket](int events) { store.connect_client(socket); });
+ loop.run();
+}
+
+int main(int argc, char* argv[]) {
+ signal(SIGTERM, signal_handler);
+ char* socket_name = NULL;
+ int64_t system_memory = -1;
+ int c;
+ while ((c = getopt(argc, argv, "s:m:")) != -1) {
+ switch (c) {
+ case 's':
+ socket_name = optarg;
+ break;
+ case 'm': {
+ char extra;
+ int scanned = sscanf(optarg, "%" SCNd64 "%c", &system_memory, &extra);
+ ARROW_CHECK(scanned == 1);
+ ARROW_LOG(INFO) << "Allowing the Plasma store to use up to "
+ << static_cast<double>(system_memory) / 1000000000
+ << "GB of memory.";
+ break;
+ }
+ default:
+ exit(-1);
+ }
+ }
+ if (!socket_name) {
+ ARROW_LOG(FATAL) << "please specify socket for incoming connections with -s switch";
+ }
+ if (system_memory == -1) {
+ ARROW_LOG(FATAL) << "please specify the amount of system memory with -m switch";
+ }
+#ifdef __linux__
+ // On Linux, check that the amount of memory available in /dev/shm is large
+ // enough to accommodate the request. If it isn't, then fail.
+ int shm_fd = open("/dev/shm", O_RDONLY);
+ struct statvfs shm_vfs_stats;
+ fstatvfs(shm_fd, &shm_vfs_stats);
+ // The value shm_vfs_stats.f_bsize is the block size, and the value
+ // shm_vfs_stats.f_bavail is the number of available blocks.
+ int64_t shm_mem_avail = shm_vfs_stats.f_bsize * shm_vfs_stats.f_bavail;
+ close(shm_fd);
+ if (system_memory > shm_mem_avail) {
+ ARROW_LOG(FATAL) << "System memory request exceeds memory available in /dev/shm. The "
+ "request is for "
+ << system_memory << " bytes, and the amount available is "
+ << shm_mem_avail
+ << " bytes. You may be able to free up space by deleting files in "
+ "/dev/shm. If you are inside a Docker container, you may need to "
+ "pass "
+ "an argument with the flag '--shm-size' to 'docker run'.";
+ }
+#endif
+ // Make it so dlmalloc fails if we try to request more memory than is
+ // available.
+ dlmalloc_set_footprint_limit((size_t)system_memory);
+ ARROW_LOG(DEBUG) << "starting server listening on " << socket_name;
+ start_server(socket_name, system_memory);
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/store.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/store.h b/cpp/src/plasma/store.h
new file mode 100644
index 0000000..8bd9426
--- /dev/null
+++ b/cpp/src/plasma/store.h
@@ -0,0 +1,169 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PLASMA_STORE_H
+#define PLASMA_STORE_H
+
+#include <deque>
+#include <vector>
+
+#include "plasma/common.h"
+#include "plasma/events.h"
+#include "plasma/eviction_policy.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+struct GetRequest;
+
+struct NotificationQueue {
+ /// The object notifications for clients. We notify the client about the
+ /// objects in the order that the objects were sealed or deleted.
+ std::deque<uint8_t*> object_notifications;
+};
+
+/// Contains all information that is associated with a Plasma store client.
+struct Client {
+ explicit Client(int fd);
+
+ /// The file descriptor used to communicate with the client.
+ int fd;
+};
+
+class PlasmaStore {
+ public:
+ PlasmaStore(EventLoop* loop, int64_t system_memory);
+
+ ~PlasmaStore();
+
+ /// Create a new object. The client must do a call to release_object to tell
+ /// the store when it is done with the object.
+ ///
+ /// @param object_id Object ID of the object to be created.
+ /// @param data_size Size in bytes of the object to be created.
+ /// @param metadata_size Size in bytes of the object metadata.
+ /// @return One of the following error codes:
+ /// - PlasmaError_OK, if the object was created successfully.
+ /// - PlasmaError_ObjectExists, if an object with this ID is already
+ /// present in the store. In this case, the client should not call
+ /// plasma_release.
+ /// - PlasmaError_OutOfMemory, if the store is out of memory and
+ /// cannot create the object. In this case, the client should not call
+ /// plasma_release.
+ int create_object(const ObjectID& object_id, int64_t data_size, int64_t metadata_size,
+ Client* client, PlasmaObject* result);
+
+ /// Delete objects that have been created in the hash table. This should only
+ /// be called on objects that are returned by the eviction policy to evict.
+ ///
+ /// @param object_ids Object IDs of the objects to be deleted.
+ /// @return Void.
+ void delete_objects(const std::vector<ObjectID>& object_ids);
+
+ /// Process a get request from a client. This method assumes that we will
+ /// eventually have these objects sealed. If one of the objects has not yet
+ /// been sealed, the client that requested the object will be notified when it
+ /// is sealed.
+ ///
+ /// For each object, the client must do a call to release_object to tell the
+ /// store when it is done with the object.
+ ///
+ /// @param client The client making this request.
+ /// @param object_ids Object IDs of the objects to be gotten.
+ /// @param timeout_ms The timeout for the get request in milliseconds.
+ /// @return Void.
+ void process_get_request(
+ Client* client, const std::vector<ObjectID>& object_ids, int64_t timeout_ms);
+
+ /// Seal an object. The object is now immutable and can be accessed with get.
+ ///
+ /// @param object_id Object ID of the object to be sealed.
+ /// @param digest The digest of the object. This is used to tell if two
+ /// objects
+ /// with the same object ID are the same.
+ /// @return Void.
+ void seal_object(const ObjectID& object_id, unsigned char digest[]);
+
+ /// Check if the plasma store contains an object:
+ ///
+ /// @param object_id Object ID that will be checked.
+ /// @return OBJECT_FOUND if the object is in the store, OBJECT_NOT_FOUND if
+ /// not
+ int contains_object(const ObjectID& object_id);
+
+ /// Record the fact that a particular client is no longer using an object.
+ ///
+ /// @param object_id The object ID of the object that is being released.
+ /// @param client The client making this request.
+ /// @param Void.
+ void release_object(const ObjectID& object_id, Client* client);
+
+ /// Subscribe a file descriptor to updates about new sealed objects.
+ ///
+ /// @param client The client making this request.
+ /// @return Void.
+ void subscribe_to_updates(Client* client);
+
+ /// Connect a new client to the PlasmaStore.
+ ///
+ /// @param listener_sock The socket that is listening to incoming connections.
+ /// @return Void.
+ void connect_client(int listener_sock);
+
+ /// Disconnect a client from the PlasmaStore.
+ ///
+ /// @param client The client that is disconnected.
+ /// @return Void.
+ void disconnect_client(Client* client);
+
+ void send_notifications(int client_fd);
+
+ Status process_message(Client* client);
+
+ private:
+ void push_notification(ObjectInfoT* object_notification);
+
+ void add_client_to_object_clients(ObjectTableEntry* entry, Client* client);
+
+ void return_from_get(GetRequest* get_req);
+
+ void update_object_get_requests(const ObjectID& object_id);
+
+ int remove_client_from_object_clients(ObjectTableEntry* entry, Client* client);
+
+ /// Event loop of the plasma store.
+ EventLoop* loop_;
+ /// The plasma store information, including the object tables, that is exposed
+ /// to the eviction policy.
+ PlasmaStoreInfo store_info_;
+ /// The state that is managed by the eviction policy.
+ EvictionPolicy eviction_policy_;
+ /// Input buffer. This is allocated only once to avoid mallocs for every
+ /// call to process_message.
+ std::vector<uint8_t> input_buffer_;
+ /// A hash table mapping object IDs to a vector of the get requests that are
+ /// waiting for the object to arrive.
+ std::unordered_map<ObjectID, std::vector<GetRequest*>, UniqueIDHasher>
+ object_get_requests_;
+ /// The pending notifications that have not been sent to subscribers because
+ /// the socket send buffers were full. This is a hash table from client file
+ /// descriptor to an array of object_ids to send to that client.
+ /// TODO(pcm): Consider putting this into the Client data structure and
+ /// reorganize the code slightly.
+ std::unordered_map<int, NotificationQueue> pending_notifications_;
+};
+
+#endif // PLASMA_STORE_H
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/test/client_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
new file mode 100644
index 0000000..29b5b13
--- /dev/null
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <assert.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "plasma/client.h"
+#include "plasma/common.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+std::string g_test_executable; // NOLINT
+
+class TestPlasmaStore : public ::testing::Test {
+ public:
+ // TODO(pcm): At the moment, stdout of the test gets mixed up with
+ // stdout of the object store. Consider changing that.
+ void SetUp() {
+ std::string plasma_directory =
+ g_test_executable.substr(0, g_test_executable.find_last_of("/"));
+ std::string plasma_command =
+ plasma_directory +
+ "/plasma_store -m 1000000000 -s /tmp/store 1> /dev/null 2> /dev/null &";
+ system(plasma_command.c_str());
+ ARROW_CHECK_OK(client_.Connect("/tmp/store", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ }
+ virtual void Finish() {
+ ARROW_CHECK_OK(client_.Disconnect());
+ system("killall plasma_store &");
+ }
+
+ protected:
+ PlasmaClient client_;
+};
+
+TEST_F(TestPlasmaStore, ContainsTest) {
+ ObjectID object_id = ObjectID::from_random();
+
+ // Test for object non-existence.
+ bool has_object;
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, false);
+
+ // Test for the object being in local Plasma store.
+ // First create object.
+ int64_t data_size = 100;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+ ARROW_CHECK_OK(client_.Seal(object_id));
+ // Avoid race condition of Plasma Manager waiting for notification.
+ ObjectBuffer object_buffer;
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ ARROW_CHECK_OK(client_.Contains(object_id, &has_object));
+ ASSERT_EQ(has_object, true);
+}
+
+TEST_F(TestPlasmaStore, GetTest) {
+ ObjectID object_id = ObjectID::from_random();
+ ObjectBuffer object_buffer;
+
+ // Test for object non-existence.
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, 0, &object_buffer));
+ ASSERT_EQ(object_buffer.data_size, -1);
+
+ // Test for the object being in local Plasma store.
+ // First create object.
+ int64_t data_size = 4;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client_.Create(object_id, data_size, metadata, metadata_size, &data));
+ for (int64_t i = 0; i < data_size; i++) {
+ data[i] = static_cast<uint8_t>(i % 4);
+ }
+ ARROW_CHECK_OK(client_.Seal(object_id));
+
+ ARROW_CHECK_OK(client_.Get(&object_id, 1, -1, &object_buffer));
+ for (int64_t i = 0; i < data_size; i++) {
+ ASSERT_EQ(data[i], object_buffer.data[i]);
+ }
+}
+
+TEST_F(TestPlasmaStore, MultipleGetTest) {
+ ObjectID object_id1 = ObjectID::from_random();
+ ObjectID object_id2 = ObjectID::from_random();
+ ObjectID object_ids[2] = {object_id1, object_id2};
+ ObjectBuffer object_buffer[2];
+
+ int64_t data_size = 4;
+ uint8_t metadata[] = {5};
+ int64_t metadata_size = sizeof(metadata);
+ uint8_t* data;
+ ARROW_CHECK_OK(client_.Create(object_id1, data_size, metadata, metadata_size, &data));
+ data[0] = 1;
+ ARROW_CHECK_OK(client_.Seal(object_id1));
+
+ ARROW_CHECK_OK(client_.Create(object_id2, data_size, metadata, metadata_size, &data));
+ data[0] = 2;
+ ARROW_CHECK_OK(client_.Seal(object_id2));
+
+ ARROW_CHECK_OK(client_.Get(object_ids, 2, -1, object_buffer));
+ ASSERT_EQ(object_buffer[0].data[0], 1);
+ ASSERT_EQ(object_buffer[1].data[0], 2);
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ g_test_executable = std::string(argv[0]);
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/test/run_tests.sh
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/run_tests.sh b/cpp/src/plasma/test/run_tests.sh
new file mode 100644
index 0000000..958bd08
--- /dev/null
+++ b/cpp/src/plasma/test/run_tests.sh
@@ -0,0 +1,61 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cause the script to exit if a single command fails.
+set -e
+
+./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 &
+sleep 1
+./src/plasma/manager_tests
+killall plasma_store
+./src/plasma/serialization_tests
+
+# Start the Redis shards.
+./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6379 &
+redis_pid1=$!
+./src/common/thirdparty/redis/src/redis-server --loglevel warning --loadmodule ./src/common/redis_module/libray_redis_module.so --port 6380 &
+redis_pid2=$!
+sleep 1
+
+# Flush the redis server
+./src/common/thirdparty/redis/src/redis-cli flushall
+# Register the shard location with the primary shard.
+./src/common/thirdparty/redis/src/redis-cli set NumRedisShards 1
+./src/common/thirdparty/redis/src/redis-cli rpush RedisShards 127.0.0.1:6380
+sleep 1
+./src/plasma/plasma_store -s /tmp/store1 -m 1000000000 &
+plasma1_pid=$!
+./src/plasma/plasma_manager -m /tmp/manager1 -s /tmp/store1 -h 127.0.0.1 -p 11111 -r 127.0.0.1:6379 &
+plasma2_pid=$!
+./src/plasma/plasma_store -s /tmp/store2 -m 1000000000 &
+plasma3_pid=$!
+./src/plasma/plasma_manager -m /tmp/manager2 -s /tmp/store2 -h 127.0.0.1 -p 22222 -r 127.0.0.1:6379 &
+plasma4_pid=$!
+sleep 1
+
+./src/plasma/client_tests
+
+kill $plasma4_pid
+kill $plasma3_pid
+kill $plasma2_pid
+kill $plasma1_pid
+kill $redis_pid1
+wait $redis_pid1
+kill $redis_pid2
+wait $redis_pid2
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/test/run_valgrind.sh
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/run_valgrind.sh b/cpp/src/plasma/test/run_valgrind.sh
new file mode 100644
index 0000000..0472194
--- /dev/null
+++ b/cpp/src/plasma/test/run_valgrind.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Cause the script to exit if a single command fails.
+set -e
+
+./src/plasma/plasma_store -s /tmp/plasma_store_socket_1 -m 0 &
+sleep 1
+valgrind --leak-check=full --error-exitcode=1 ./src/plasma/manager_tests
+killall plasma_store
+valgrind --leak-check=full --error-exitcode=1 ./src/plasma/serialization_tests
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/test/serialization_tests.cc
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/test/serialization_tests.cc b/cpp/src/plasma/test/serialization_tests.cc
new file mode 100644
index 0000000..325cead
--- /dev/null
+++ b/cpp/src/plasma/test/serialization_tests.cc
@@ -0,0 +1,388 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include <sys/types.h>
+#include <unistd.h>
+
+#include "plasma/common.h"
+#include "plasma/io.h"
+#include "plasma/plasma.h"
+#include "plasma/protocol.h"
+
+/**
+ * Create a temporary file. Needs to be closed by the caller.
+ *
+ * @return File descriptor of the file.
+ */
+int create_temp_file(void) {
+ static char temp[] = "/tmp/tempfileXXXXXX";
+ char file_name[32];
+ strncpy(file_name, temp, 32);
+ return mkstemp(file_name);
+}
+
+/**
+ * Seek to the beginning of a file and read a message from it.
+ *
+ * @param fd File descriptor of the file.
+ * @param message type Message type that we expect in the file.
+ *
+ * @return Pointer to the content of the message. Needs to be freed by the
+ * caller.
+ */
+std::vector<uint8_t> read_message_from_file(int fd, int message_type) {
+ /* Go to the beginning of the file. */
+ lseek(fd, 0, SEEK_SET);
+ int64_t type;
+ std::vector<uint8_t> data;
+ ARROW_CHECK_OK(ReadMessage(fd, &type, &data));
+ ARROW_CHECK(type == message_type);
+ return data;
+}
+
+PlasmaObject random_plasma_object(void) {
+ unsigned int seed = static_cast<unsigned int>(time(NULL));
+ int random = rand_r(&seed);
+ PlasmaObject object;
+ memset(&object, 0, sizeof(object));
+ object.handle.store_fd = random + 7;
+ object.handle.mmap_size = random + 42;
+ object.data_offset = random + 1;
+ object.metadata_offset = random + 2;
+ object.data_size = random + 3;
+ object.metadata_size = random + 4;
+ return object;
+}
+
+TEST(PlasmaSerialization, CreateRequest) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ int64_t data_size1 = 42;
+ int64_t metadata_size1 = 11;
+ ARROW_CHECK_OK(SendCreateRequest(fd, object_id1, data_size1, metadata_size1));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateRequest);
+ ObjectID object_id2;
+ int64_t data_size2;
+ int64_t metadata_size2;
+ ARROW_CHECK_OK(
+ ReadCreateRequest(data.data(), &object_id2, &data_size2, &metadata_size2));
+ ASSERT_EQ(data_size1, data_size2);
+ ASSERT_EQ(metadata_size1, metadata_size2);
+ ASSERT_EQ(object_id1, object_id2);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, CreateReply) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ PlasmaObject object1 = random_plasma_object();
+ ARROW_CHECK_OK(SendCreateReply(fd, object_id1, &object1, 0));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaCreateReply);
+ ObjectID object_id2;
+ PlasmaObject object2;
+ memset(&object2, 0, sizeof(object2));
+ ARROW_CHECK_OK(ReadCreateReply(data.data(), &object_id2, &object2));
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(memcmp(&object1, &object2, sizeof(object1)), 0);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, SealRequest) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ unsigned char digest1[kDigestSize];
+ memset(&digest1[0], 7, kDigestSize);
+ ARROW_CHECK_OK(SendSealRequest(fd, object_id1, &digest1[0]));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealRequest);
+ ObjectID object_id2;
+ unsigned char digest2[kDigestSize];
+ ARROW_CHECK_OK(ReadSealRequest(data.data(), &object_id2, &digest2[0]));
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(memcmp(&digest1[0], &digest2[0], kDigestSize), 0);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, SealReply) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ ARROW_CHECK_OK(SendSealReply(fd, object_id1, PlasmaError_ObjectExists));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaSealReply);
+ ObjectID object_id2;
+ Status s = ReadSealReply(data.data(), &object_id2);
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_TRUE(s.IsPlasmaObjectExists());
+ close(fd);
+}
+
+TEST(PlasmaSerialization, GetRequest) {
+ int fd = create_temp_file();
+ ObjectID object_ids[2];
+ object_ids[0] = ObjectID::from_random();
+ object_ids[1] = ObjectID::from_random();
+ int64_t timeout_ms = 1234;
+ ARROW_CHECK_OK(SendGetRequest(fd, object_ids, 2, timeout_ms));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetRequest);
+ std::vector<ObjectID> object_ids_return;
+ int64_t timeout_ms_return;
+ ARROW_CHECK_OK(ReadGetRequest(data.data(), object_ids_return, &timeout_ms_return));
+ ASSERT_EQ(object_ids[0], object_ids_return[0]);
+ ASSERT_EQ(object_ids[1], object_ids_return[1]);
+ ASSERT_EQ(timeout_ms, timeout_ms_return);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, GetReply) {
+ int fd = create_temp_file();
+ ObjectID object_ids[2];
+ object_ids[0] = ObjectID::from_random();
+ object_ids[1] = ObjectID::from_random();
+ std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher> plasma_objects;
+ plasma_objects[object_ids[0]] = random_plasma_object();
+ plasma_objects[object_ids[1]] = random_plasma_object();
+ ARROW_CHECK_OK(SendGetReply(fd, object_ids, plasma_objects, 2));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaGetReply);
+ ObjectID object_ids_return[2];
+ PlasmaObject plasma_objects_return[2];
+ memset(&plasma_objects_return, 0, sizeof(plasma_objects_return));
+ ARROW_CHECK_OK(
+ ReadGetReply(data.data(), object_ids_return, &plasma_objects_return[0], 2));
+ ASSERT_EQ(object_ids[0], object_ids_return[0]);
+ ASSERT_EQ(object_ids[1], object_ids_return[1]);
+ ASSERT_EQ(memcmp(&plasma_objects[object_ids[0]], &plasma_objects_return[0],
+ sizeof(PlasmaObject)),
+ 0);
+ ASSERT_EQ(memcmp(&plasma_objects[object_ids[1]], &plasma_objects_return[1],
+ sizeof(PlasmaObject)),
+ 0);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, ReleaseRequest) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ ARROW_CHECK_OK(SendReleaseRequest(fd, object_id1));
+ std::vector<uint8_t> data =
+ read_message_from_file(fd, MessageType_PlasmaReleaseRequest);
+ ObjectID object_id2;
+ ARROW_CHECK_OK(ReadReleaseRequest(data.data(), &object_id2));
+ ASSERT_EQ(object_id1, object_id2);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, ReleaseReply) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ ARROW_CHECK_OK(SendReleaseReply(fd, object_id1, PlasmaError_ObjectExists));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaReleaseReply);
+ ObjectID object_id2;
+ Status s = ReadReleaseReply(data.data(), &object_id2);
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_TRUE(s.IsPlasmaObjectExists());
+ close(fd);
+}
+
+TEST(PlasmaSerialization, DeleteRequest) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ ARROW_CHECK_OK(SendDeleteRequest(fd, object_id1));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteRequest);
+ ObjectID object_id2;
+ ARROW_CHECK_OK(ReadDeleteRequest(data.data(), &object_id2));
+ ASSERT_EQ(object_id1, object_id2);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, DeleteReply) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ int error1 = PlasmaError_ObjectExists;
+ ARROW_CHECK_OK(SendDeleteReply(fd, object_id1, error1));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDeleteReply);
+ ObjectID object_id2;
+ Status s = ReadDeleteReply(data.data(), &object_id2);
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_TRUE(s.IsPlasmaObjectExists());
+ close(fd);
+}
+
+TEST(PlasmaSerialization, StatusRequest) {
+ int fd = create_temp_file();
+ int64_t num_objects = 2;
+ ObjectID object_ids[num_objects];
+ object_ids[0] = ObjectID::from_random();
+ object_ids[1] = ObjectID::from_random();
+ ARROW_CHECK_OK(SendStatusRequest(fd, object_ids, num_objects));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusRequest);
+ ObjectID object_ids_read[num_objects];
+ ARROW_CHECK_OK(ReadStatusRequest(data.data(), object_ids_read, num_objects));
+ ASSERT_EQ(object_ids[0], object_ids_read[0]);
+ ASSERT_EQ(object_ids[1], object_ids_read[1]);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, StatusReply) {
+ int fd = create_temp_file();
+ ObjectID object_ids[2];
+ object_ids[0] = ObjectID::from_random();
+ object_ids[1] = ObjectID::from_random();
+ int object_statuses[2] = {42, 43};
+ ARROW_CHECK_OK(SendStatusReply(fd, object_ids, object_statuses, 2));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaStatusReply);
+ int64_t num_objects = ReadStatusReply_num_objects(data.data());
+ ObjectID object_ids_read[num_objects];
+ int object_statuses_read[num_objects];
+ ARROW_CHECK_OK(
+ ReadStatusReply(data.data(), object_ids_read, object_statuses_read, num_objects));
+ ASSERT_EQ(object_ids[0], object_ids_read[0]);
+ ASSERT_EQ(object_ids[1], object_ids_read[1]);
+ ASSERT_EQ(object_statuses[0], object_statuses_read[0]);
+ ASSERT_EQ(object_statuses[1], object_statuses_read[1]);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, EvictRequest) {
+ int fd = create_temp_file();
+ int64_t num_bytes = 111;
+ ARROW_CHECK_OK(SendEvictRequest(fd, num_bytes));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictRequest);
+ int64_t num_bytes_received;
+ ARROW_CHECK_OK(ReadEvictRequest(data.data(), &num_bytes_received));
+ ASSERT_EQ(num_bytes, num_bytes_received);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, EvictReply) {
+ int fd = create_temp_file();
+ int64_t num_bytes = 111;
+ ARROW_CHECK_OK(SendEvictReply(fd, num_bytes));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaEvictReply);
+ int64_t num_bytes_received;
+ ARROW_CHECK_OK(ReadEvictReply(data.data(), num_bytes_received));
+ ASSERT_EQ(num_bytes, num_bytes_received);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, FetchRequest) {
+ int fd = create_temp_file();
+ ObjectID object_ids[2];
+ object_ids[0] = ObjectID::from_random();
+ object_ids[1] = ObjectID::from_random();
+ ARROW_CHECK_OK(SendFetchRequest(fd, object_ids, 2));
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaFetchRequest);
+ std::vector<ObjectID> object_ids_read;
+ ARROW_CHECK_OK(ReadFetchRequest(data.data(), object_ids_read));
+ ASSERT_EQ(object_ids[0], object_ids_read[0]);
+ ASSERT_EQ(object_ids[1], object_ids_read[1]);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, WaitRequest) {
+ int fd = create_temp_file();
+ const int num_objects_in = 2;
+ ObjectRequest object_requests_in[num_objects_in] = {
+ ObjectRequest({ObjectID::from_random(), PLASMA_QUERY_ANYWHERE, 0}),
+ ObjectRequest({ObjectID::from_random(), PLASMA_QUERY_LOCAL, 0})};
+ const int num_ready_objects_in = 1;
+ int64_t timeout_ms = 1000;
+
+ ARROW_CHECK_OK(SendWaitRequest(
+ fd, &object_requests_in[0], num_objects_in, num_ready_objects_in, timeout_ms));
+ /* Read message back. */
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitRequest);
+ int num_ready_objects_out;
+ int64_t timeout_ms_read;
+ ObjectRequestMap object_requests_out;
+ ARROW_CHECK_OK(ReadWaitRequest(
+ data.data(), object_requests_out, &timeout_ms_read, &num_ready_objects_out));
+ ASSERT_EQ(num_objects_in, object_requests_out.size());
+ ASSERT_EQ(num_ready_objects_out, num_ready_objects_in);
+ for (int i = 0; i < num_objects_in; i++) {
+ const ObjectID& object_id = object_requests_in[i].object_id;
+ ASSERT_EQ(1, object_requests_out.count(object_id));
+ const auto& entry = object_requests_out.find(object_id);
+ ASSERT_TRUE(entry != object_requests_out.end());
+ ASSERT_EQ(entry->second.object_id, object_requests_in[i].object_id);
+ ASSERT_EQ(entry->second.type, object_requests_in[i].type);
+ }
+ close(fd);
+}
+
+TEST(PlasmaSerialization, WaitReply) {
+ int fd = create_temp_file();
+ const int num_objects_in = 2;
+ /* Create a map with two ObjectRequests in it. */
+ ObjectRequestMap objects_in(num_objects_in);
+ ObjectID id1 = ObjectID::from_random();
+ objects_in[id1] = ObjectRequest({id1, 0, ObjectStatus_Local});
+ ObjectID id2 = ObjectID::from_random();
+ objects_in[id2] = ObjectRequest({id2, 0, ObjectStatus_Nonexistent});
+
+ ARROW_CHECK_OK(SendWaitReply(fd, objects_in, num_objects_in));
+ /* Read message back. */
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaWaitReply);
+ ObjectRequest objects_out[2];
+ int num_objects_out;
+ ARROW_CHECK_OK(ReadWaitReply(data.data(), &objects_out[0], &num_objects_out));
+ ASSERT_EQ(num_objects_in, num_objects_out);
+ for (int i = 0; i < num_objects_out; i++) {
+ /* Each object request must appear exactly once. */
+ ASSERT_EQ(objects_in.count(objects_out[i].object_id), 1);
+ const auto& entry = objects_in.find(objects_out[i].object_id);
+ ASSERT_TRUE(entry != objects_in.end());
+ ASSERT_EQ(entry->second.object_id, objects_out[i].object_id);
+ ASSERT_EQ(entry->second.status, objects_out[i].status);
+ }
+ close(fd);
+}
+
+TEST(PlasmaSerialization, DataRequest) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ const char* address1 = "address1";
+ int port1 = 12345;
+ ARROW_CHECK_OK(SendDataRequest(fd, object_id1, address1, port1));
+ /* Reading message back. */
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDataRequest);
+ ObjectID object_id2;
+ char* address2;
+ int port2;
+ ARROW_CHECK_OK(ReadDataRequest(data.data(), &object_id2, &address2, &port2));
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(strcmp(address1, address2), 0);
+ ASSERT_EQ(port1, port2);
+ free(address2);
+ close(fd);
+}
+
+TEST(PlasmaSerialization, DataReply) {
+ int fd = create_temp_file();
+ ObjectID object_id1 = ObjectID::from_random();
+ int64_t object_size1 = 146;
+ int64_t metadata_size1 = 198;
+ ARROW_CHECK_OK(SendDataReply(fd, object_id1, object_size1, metadata_size1));
+ /* Reading message back. */
+ std::vector<uint8_t> data = read_message_from_file(fd, MessageType_PlasmaDataReply);
+ ObjectID object_id2;
+ int64_t object_size2;
+ int64_t metadata_size2;
+ ARROW_CHECK_OK(ReadDataReply(data.data(), &object_id2, &object_size2, &metadata_size2));
+ ASSERT_EQ(object_id1, object_id2);
+ ASSERT_EQ(object_size1, object_size2);
+ ASSERT_EQ(metadata_size1, metadata_size2);
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/thirdparty/ae/ae.c
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae.c b/cpp/src/plasma/thirdparty/ae/ae.c
new file mode 100644
index 0000000..e66808a
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae.c
@@ -0,0 +1,465 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2010, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <unistd.h>
+#include <stdlib.h>
+#include <poll.h>
+#include <string.h>
+#include <time.h>
+#include <errno.h>
+
+#include "ae.h"
+#include "zmalloc.h"
+#include "config.h"
+
+/* Include the best multiplexing layer supported by this system.
+ * The following should be ordered by performances, descending. */
+#ifdef HAVE_EVPORT
+#include "ae_evport.c"
+#else
+ #ifdef HAVE_EPOLL
+ #include "ae_epoll.c"
+ #else
+ #ifdef HAVE_KQUEUE
+ #include "ae_kqueue.c"
+ #else
+ #include "ae_select.c"
+ #endif
+ #endif
+#endif
+
+aeEventLoop *aeCreateEventLoop(int setsize) {
+ aeEventLoop *eventLoop;
+ int i;
+
+ if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
+ eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
+ eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
+ if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
+ eventLoop->setsize = setsize;
+ eventLoop->lastTime = time(NULL);
+ eventLoop->timeEventHead = NULL;
+ eventLoop->timeEventNextId = 0;
+ eventLoop->stop = 0;
+ eventLoop->maxfd = -1;
+ eventLoop->beforesleep = NULL;
+ if (aeApiCreate(eventLoop) == -1) goto err;
+ /* Events with mask == AE_NONE are not set. So let's initialize the
+ * vector with it. */
+ for (i = 0; i < setsize; i++)
+ eventLoop->events[i].mask = AE_NONE;
+ return eventLoop;
+
+err:
+ if (eventLoop) {
+ zfree(eventLoop->events);
+ zfree(eventLoop->fired);
+ zfree(eventLoop);
+ }
+ return NULL;
+}
+
+/* Return the current set size. */
+int aeGetSetSize(aeEventLoop *eventLoop) {
+ return eventLoop->setsize;
+}
+
+/* Resize the maximum set size of the event loop.
+ * If the requested set size is smaller than the current set size, but
+ * there is already a file descriptor in use that is >= the requested
+ * set size minus one, AE_ERR is returned and the operation is not
+ * performed at all.
+ *
+ * Otherwise AE_OK is returned and the operation is successful. */
+int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) {
+ int i;
+
+ if (setsize == eventLoop->setsize) return AE_OK;
+ if (eventLoop->maxfd >= setsize) return AE_ERR;
+ if (aeApiResize(eventLoop,setsize) == -1) return AE_ERR;
+
+ eventLoop->events = zrealloc(eventLoop->events,sizeof(aeFileEvent)*setsize);
+ eventLoop->fired = zrealloc(eventLoop->fired,sizeof(aeFiredEvent)*setsize);
+ eventLoop->setsize = setsize;
+
+ /* Make sure that if we created new slots, they are initialized with
+ * an AE_NONE mask. */
+ for (i = eventLoop->maxfd+1; i < setsize; i++)
+ eventLoop->events[i].mask = AE_NONE;
+ return AE_OK;
+}
+
+void aeDeleteEventLoop(aeEventLoop *eventLoop) {
+ aeApiFree(eventLoop);
+ zfree(eventLoop->events);
+ zfree(eventLoop->fired);
+ zfree(eventLoop);
+}
+
+void aeStop(aeEventLoop *eventLoop) {
+ eventLoop->stop = 1;
+}
+
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData)
+{
+ if (fd >= eventLoop->setsize) {
+ errno = ERANGE;
+ return AE_ERR;
+ }
+ aeFileEvent *fe = &eventLoop->events[fd];
+
+ if (aeApiAddEvent(eventLoop, fd, mask) == -1)
+ return AE_ERR;
+ fe->mask |= mask;
+ if (mask & AE_READABLE) fe->rfileProc = proc;
+ if (mask & AE_WRITABLE) fe->wfileProc = proc;
+ fe->clientData = clientData;
+ if (fd > eventLoop->maxfd)
+ eventLoop->maxfd = fd;
+ return AE_OK;
+}
+
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask)
+{
+ if (fd >= eventLoop->setsize) return;
+ aeFileEvent *fe = &eventLoop->events[fd];
+ if (fe->mask == AE_NONE) return;
+
+ aeApiDelEvent(eventLoop, fd, mask);
+ fe->mask = fe->mask & (~mask);
+ if (fd == eventLoop->maxfd && fe->mask == AE_NONE) {
+ /* Update the max fd */
+ int j;
+
+ for (j = eventLoop->maxfd-1; j >= 0; j--)
+ if (eventLoop->events[j].mask != AE_NONE) break;
+ eventLoop->maxfd = j;
+ }
+}
+
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd) {
+ if (fd >= eventLoop->setsize) return 0;
+ aeFileEvent *fe = &eventLoop->events[fd];
+
+ return fe->mask;
+}
+
+static void aeGetTime(long *seconds, long *milliseconds)
+{
+ struct timeval tv;
+
+ gettimeofday(&tv, NULL);
+ *seconds = tv.tv_sec;
+ *milliseconds = tv.tv_usec/1000;
+}
+
+static void aeAddMillisecondsToNow(long long milliseconds, long *sec, long *ms) {
+ long cur_sec, cur_ms, when_sec, when_ms;
+
+ aeGetTime(&cur_sec, &cur_ms);
+ when_sec = cur_sec + milliseconds/1000;
+ when_ms = cur_ms + milliseconds%1000;
+ if (when_ms >= 1000) {
+ when_sec ++;
+ when_ms -= 1000;
+ }
+ *sec = when_sec;
+ *ms = when_ms;
+}
+
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+ aeTimeProc *proc, void *clientData,
+ aeEventFinalizerProc *finalizerProc)
+{
+ long long id = eventLoop->timeEventNextId++;
+ aeTimeEvent *te;
+
+ te = zmalloc(sizeof(*te));
+ if (te == NULL) return AE_ERR;
+ te->id = id;
+ aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
+ te->timeProc = proc;
+ te->finalizerProc = finalizerProc;
+ te->clientData = clientData;
+ te->next = eventLoop->timeEventHead;
+ eventLoop->timeEventHead = te;
+ return id;
+}
+
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id)
+{
+ aeTimeEvent *te = eventLoop->timeEventHead;
+ while(te) {
+ if (te->id == id) {
+ te->id = AE_DELETED_EVENT_ID;
+ return AE_OK;
+ }
+ te = te->next;
+ }
+ return AE_ERR; /* NO event with the specified ID found */
+}
+
+/* Search the first timer to fire.
+ * This operation is useful to know how many time the select can be
+ * put in sleep without to delay any event.
+ * If there are no timers NULL is returned.
+ *
+ * Note that's O(N) since time events are unsorted.
+ * Possible optimizations (not needed by Redis so far, but...):
+ * 1) Insert the event in order, so that the nearest is just the head.
+ * Much better but still insertion or deletion of timers is O(N).
+ * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
+ */
+static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
+{
+ aeTimeEvent *te = eventLoop->timeEventHead;
+ aeTimeEvent *nearest = NULL;
+
+ while(te) {
+ if (!nearest || te->when_sec < nearest->when_sec ||
+ (te->when_sec == nearest->when_sec &&
+ te->when_ms < nearest->when_ms))
+ nearest = te;
+ te = te->next;
+ }
+ return nearest;
+}
+
+/* Process time events */
+static int processTimeEvents(aeEventLoop *eventLoop) {
+ int processed = 0;
+ aeTimeEvent *te, *prev;
+ long long maxId;
+ time_t now = time(NULL);
+
+ /* If the system clock is moved to the future, and then set back to the
+ * right value, time events may be delayed in a random way. Often this
+ * means that scheduled operations will not be performed soon enough.
+ *
+ * Here we try to detect system clock skews, and force all the time
+ * events to be processed ASAP when this happens: the idea is that
+ * processing events earlier is less dangerous than delaying them
+ * indefinitely, and practice suggests it is. */
+ if (now < eventLoop->lastTime) {
+ te = eventLoop->timeEventHead;
+ while(te) {
+ te->when_sec = 0;
+ te = te->next;
+ }
+ }
+ eventLoop->lastTime = now;
+
+ prev = NULL;
+ te = eventLoop->timeEventHead;
+ maxId = eventLoop->timeEventNextId-1;
+ while(te) {
+ long now_sec, now_ms;
+ long long id;
+
+ /* Remove events scheduled for deletion. */
+ if (te->id == AE_DELETED_EVENT_ID) {
+ aeTimeEvent *next = te->next;
+ if (prev == NULL)
+ eventLoop->timeEventHead = te->next;
+ else
+ prev->next = te->next;
+ if (te->finalizerProc)
+ te->finalizerProc(eventLoop, te->clientData);
+ zfree(te);
+ te = next;
+ continue;
+ }
+
+ /* Make sure we don't process time events created by time events in
+ * this iteration. Note that this check is currently useless: we always
+ * add new timers on the head, however if we change the implementation
+ * detail, this check may be useful again: we keep it here for future
+ * defense. */
+ if (te->id > maxId) {
+ te = te->next;
+ continue;
+ }
+ aeGetTime(&now_sec, &now_ms);
+ if (now_sec > te->when_sec ||
+ (now_sec == te->when_sec && now_ms >= te->when_ms))
+ {
+ int retval;
+
+ id = te->id;
+ retval = te->timeProc(eventLoop, id, te->clientData);
+ processed++;
+ if (retval != AE_NOMORE) {
+ aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
+ } else {
+ te->id = AE_DELETED_EVENT_ID;
+ }
+ }
+ prev = te;
+ te = te->next;
+ }
+ return processed;
+}
+
+/* Process every pending time event, then every pending file event
+ * (that may be registered by time event callbacks just processed).
+ * Without special flags the function sleeps until some file event
+ * fires, or when the next time event occurs (if any).
+ *
+ * If flags is 0, the function does nothing and returns.
+ * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
+ * if flags has AE_FILE_EVENTS set, file events are processed.
+ * if flags has AE_TIME_EVENTS set, time events are processed.
+ * if flags has AE_DONT_WAIT set the function returns ASAP until all
+ * the events that's possible to process without to wait are processed.
+ *
+ * The function returns the number of events processed. */
+int aeProcessEvents(aeEventLoop *eventLoop, int flags)
+{
+ int processed = 0, numevents;
+
+ /* Nothing to do? return ASAP */
+ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
+
+ /* Note that we want call select() even if there are no
+ * file events to process as long as we want to process time
+ * events, in order to sleep until the next time event is ready
+ * to fire. */
+ if (eventLoop->maxfd != -1 ||
+ ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
+ int j;
+ aeTimeEvent *shortest = NULL;
+ struct timeval tv, *tvp;
+
+ if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
+ shortest = aeSearchNearestTimer(eventLoop);
+ if (shortest) {
+ long now_sec, now_ms;
+
+ aeGetTime(&now_sec, &now_ms);
+ tvp = &tv;
+
+ /* How many milliseconds we need to wait for the next
+ * time event to fire? */
+ long long ms =
+ (shortest->when_sec - now_sec)*1000 +
+ shortest->when_ms - now_ms;
+
+ if (ms > 0) {
+ tvp->tv_sec = ms/1000;
+ tvp->tv_usec = (ms % 1000)*1000;
+ } else {
+ tvp->tv_sec = 0;
+ tvp->tv_usec = 0;
+ }
+ } else {
+ /* If we have to check for events but need to return
+ * ASAP because of AE_DONT_WAIT we need to set the timeout
+ * to zero */
+ if (flags & AE_DONT_WAIT) {
+ tv.tv_sec = tv.tv_usec = 0;
+ tvp = &tv;
+ } else {
+ /* Otherwise we can block */
+ tvp = NULL; /* wait forever */
+ }
+ }
+
+ numevents = aeApiPoll(eventLoop, tvp);
+ for (j = 0; j < numevents; j++) {
+ aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
+ int mask = eventLoop->fired[j].mask;
+ int fd = eventLoop->fired[j].fd;
+ int rfired = 0;
+
+ /* note the fe->mask & mask & ... code: maybe an already processed
+ * event removed an element that fired and we still didn't
+ * processed, so we check if the event is still valid. */
+ if (fe->mask & mask & AE_READABLE) {
+ rfired = 1;
+ fe->rfileProc(eventLoop,fd,fe->clientData,mask);
+ }
+ if (fe->mask & mask & AE_WRITABLE) {
+ if (!rfired || fe->wfileProc != fe->rfileProc)
+ fe->wfileProc(eventLoop,fd,fe->clientData,mask);
+ }
+ processed++;
+ }
+ }
+ /* Check time events */
+ if (flags & AE_TIME_EVENTS)
+ processed += processTimeEvents(eventLoop);
+
+ return processed; /* return the number of processed file/time events */
+}
+
+/* Wait for milliseconds until the given file descriptor becomes
+ * writable/readable/exception */
+int aeWait(int fd, int mask, long long milliseconds) {
+ struct pollfd pfd;
+ int retmask = 0, retval;
+
+ memset(&pfd, 0, sizeof(pfd));
+ pfd.fd = fd;
+ if (mask & AE_READABLE) pfd.events |= POLLIN;
+ if (mask & AE_WRITABLE) pfd.events |= POLLOUT;
+
+ if ((retval = poll(&pfd, 1, milliseconds))== 1) {
+ if (pfd.revents & POLLIN) retmask |= AE_READABLE;
+ if (pfd.revents & POLLOUT) retmask |= AE_WRITABLE;
+ if (pfd.revents & POLLERR) retmask |= AE_WRITABLE;
+ if (pfd.revents & POLLHUP) retmask |= AE_WRITABLE;
+ return retmask;
+ } else {
+ return retval;
+ }
+}
+
+void aeMain(aeEventLoop *eventLoop) {
+ eventLoop->stop = 0;
+ while (!eventLoop->stop) {
+ if (eventLoop->beforesleep != NULL)
+ eventLoop->beforesleep(eventLoop);
+ aeProcessEvents(eventLoop, AE_ALL_EVENTS);
+ }
+}
+
+char *aeGetApiName(void) {
+ return aeApiName();
+}
+
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) {
+ eventLoop->beforesleep = beforesleep;
+}
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/thirdparty/ae/ae.h
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae.h b/cpp/src/plasma/thirdparty/ae/ae.h
new file mode 100644
index 0000000..827c4c9
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae.h
@@ -0,0 +1,123 @@
+/* A simple event-driven programming library. Originally I wrote this code
+ * for the Jim's event-loop (Jim is a Tcl interpreter) but later translated
+ * it in form of a library for easy reuse.
+ *
+ * Copyright (c) 2006-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef __AE_H__
+#define __AE_H__
+
+#include <time.h>
+
+#define AE_OK 0
+#define AE_ERR -1
+
+#define AE_NONE 0
+#define AE_READABLE 1
+#define AE_WRITABLE 2
+
+#define AE_FILE_EVENTS 1
+#define AE_TIME_EVENTS 2
+#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
+#define AE_DONT_WAIT 4
+
+#define AE_NOMORE -1
+#define AE_DELETED_EVENT_ID -1
+
+/* Macros */
+#define AE_NOTUSED(V) ((void) V)
+
+struct aeEventLoop;
+
+/* Types and data structures */
+typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
+typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
+typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
+typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
+
+/* File event structure */
+typedef struct aeFileEvent {
+ int mask; /* one of AE_(READABLE|WRITABLE) */
+ aeFileProc *rfileProc;
+ aeFileProc *wfileProc;
+ void *clientData;
+} aeFileEvent;
+
+/* Time event structure */
+typedef struct aeTimeEvent {
+ long long id; /* time event identifier. */
+ long when_sec; /* seconds */
+ long when_ms; /* milliseconds */
+ aeTimeProc *timeProc;
+ aeEventFinalizerProc *finalizerProc;
+ void *clientData;
+ struct aeTimeEvent *next;
+} aeTimeEvent;
+
+/* A fired event */
+typedef struct aeFiredEvent {
+ int fd;
+ int mask;
+} aeFiredEvent;
+
+/* State of an event based program */
+typedef struct aeEventLoop {
+ int maxfd; /* highest file descriptor currently registered */
+ int setsize; /* max number of file descriptors tracked */
+ long long timeEventNextId;
+ time_t lastTime; /* Used to detect system clock skew */
+ aeFileEvent *events; /* Registered events */
+ aeFiredEvent *fired; /* Fired events */
+ aeTimeEvent *timeEventHead;
+ int stop;
+ void *apidata; /* This is used for polling API specific data */
+ aeBeforeSleepProc *beforesleep;
+} aeEventLoop;
+
+/* Prototypes */
+aeEventLoop *aeCreateEventLoop(int setsize);
+void aeDeleteEventLoop(aeEventLoop *eventLoop);
+void aeStop(aeEventLoop *eventLoop);
+int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
+ aeFileProc *proc, void *clientData);
+void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
+int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
+long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
+ aeTimeProc *proc, void *clientData,
+ aeEventFinalizerProc *finalizerProc);
+int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
+int aeProcessEvents(aeEventLoop *eventLoop, int flags);
+int aeWait(int fd, int mask, long long milliseconds);
+void aeMain(aeEventLoop *eventLoop);
+char *aeGetApiName(void);
+void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
+int aeGetSetSize(aeEventLoop *eventLoop);
+int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
+
+#endif
http://git-wip-us.apache.org/repos/asf/arrow/blob/2c810151/cpp/src/plasma/thirdparty/ae/ae_epoll.c
----------------------------------------------------------------------
diff --git a/cpp/src/plasma/thirdparty/ae/ae_epoll.c b/cpp/src/plasma/thirdparty/ae/ae_epoll.c
new file mode 100644
index 0000000..410aac7
--- /dev/null
+++ b/cpp/src/plasma/thirdparty/ae/ae_epoll.c
@@ -0,0 +1,135 @@
+/* Linux epoll(2) based ae.c module
+ *
+ * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * * Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ * * Neither the name of Redis nor the names of its contributors may be used
+ * to endorse or promote products derived from this software without
+ * specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+ * POSSIBILITY OF SUCH DAMAGE.
+ */
+
+
+#include <sys/epoll.h>
+
+typedef struct aeApiState {
+ int epfd;
+ struct epoll_event *events;
+} aeApiState;
+
+static int aeApiCreate(aeEventLoop *eventLoop) {
+ aeApiState *state = zmalloc(sizeof(aeApiState));
+
+ if (!state) return -1;
+ state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
+ if (!state->events) {
+ zfree(state);
+ return -1;
+ }
+ state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
+ if (state->epfd == -1) {
+ zfree(state->events);
+ zfree(state);
+ return -1;
+ }
+ eventLoop->apidata = state;
+ return 0;
+}
+
+static int aeApiResize(aeEventLoop *eventLoop, int setsize) {
+ aeApiState *state = eventLoop->apidata;
+
+ state->events = zrealloc(state->events, sizeof(struct epoll_event)*setsize);
+ return 0;
+}
+
+static void aeApiFree(aeEventLoop *eventLoop) {
+ aeApiState *state = eventLoop->apidata;
+
+ close(state->epfd);
+ zfree(state->events);
+ zfree(state);
+}
+
+static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
+ aeApiState *state = eventLoop->apidata;
+ struct epoll_event ee = {0}; /* avoid valgrind warning */
+ /* If the fd was already monitored for some event, we need a MOD
+ * operation. Otherwise we need an ADD operation. */
+ int op = eventLoop->events[fd].mask == AE_NONE ?
+ EPOLL_CTL_ADD : EPOLL_CTL_MOD;
+
+ ee.events = 0;
+ mask |= eventLoop->events[fd].mask; /* Merge old events */
+ if (mask & AE_READABLE) ee.events |= EPOLLIN;
+ if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+ ee.data.fd = fd;
+ if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
+ return 0;
+}
+
+static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) {
+ aeApiState *state = eventLoop->apidata;
+ struct epoll_event ee = {0}; /* avoid valgrind warning */
+ int mask = eventLoop->events[fd].mask & (~delmask);
+
+ ee.events = 0;
+ if (mask & AE_READABLE) ee.events |= EPOLLIN;
+ if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
+ ee.data.fd = fd;
+ if (mask != AE_NONE) {
+ epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee);
+ } else {
+ /* Note, Kernel < 2.6.9 requires a non null event pointer even for
+ * EPOLL_CTL_DEL. */
+ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee);
+ }
+}
+
+static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
+ aeApiState *state = eventLoop->apidata;
+ int retval, numevents = 0;
+
+ retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
+ tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
+ if (retval > 0) {
+ int j;
+
+ numevents = retval;
+ for (j = 0; j < numevents; j++) {
+ int mask = 0;
+ struct epoll_event *e = state->events+j;
+
+ if (e->events & EPOLLIN) mask |= AE_READABLE;
+ if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
+ if (e->events & EPOLLERR) mask |= AE_WRITABLE;
+ if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
+ eventLoop->fired[j].fd = e->data.fd;
+ eventLoop->fired[j].mask = mask;
+ }
+ }
+ return numevents;
+}
+
+static char *aeApiName(void) {
+ return "epoll";
+}