You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by uw...@apache.org on 2018/05/05 08:37:56 UTC
[arrow] branch master updated: ARROW-2541: [Plasma] Replace macros
with constexpr
This is an automated email from the ASF dual-hosted git repository.
uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 5bdfff8 ARROW-2541: [Plasma] Replace macros with constexpr
5bdfff8 is described below
commit 5bdfff8612c30b0e458d026e7a11b859663fa00b
Author: Philipp Moritz <pc...@gmail.com>
AuthorDate: Sat May 5 10:37:48 2018 +0200
ARROW-2541: [Plasma] Replace macros with constexpr
Author: Philipp Moritz <pc...@gmail.com>
Closes #2001 from pcmoritz/remove-macros and squashes the following commits:
cc3a7d24 <Philipp Moritz> fix comments
3ac436b1 <Philipp Moritz> fix linting
de7b0b63 <Philipp Moritz> more cleanups
9024214e <Philipp Moritz> fix linting
a59e1b5b <Philipp Moritz> fix documentation
176b7c66 <Philipp Moritz> clean up macros
---
cpp/apidoc/tutorials/plasma.md | 8 +++----
cpp/src/plasma/client.cc | 4 ++--
cpp/src/plasma/client.h | 13 ++++++++---
cpp/src/plasma/format/plasma.fbs | 4 +++-
cpp/src/plasma/io.cc | 43 ++++++++++++++++++++-----------------
cpp/src/plasma/io.h | 7 +++---
cpp/src/plasma/plasma.h | 3 ++-
cpp/src/plasma/store.cc | 6 +++---
cpp/src/plasma/test/client_tests.cc | 6 ++----
9 files changed, 52 insertions(+), 42 deletions(-)
diff --git a/cpp/apidoc/tutorials/plasma.md b/cpp/apidoc/tutorials/plasma.md
index 8d54a10..5a0a2ba 100644
--- a/cpp/apidoc/tutorials/plasma.md
+++ b/cpp/apidoc/tutorials/plasma.md
@@ -80,7 +80,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
// Disconnect the Plasma client.
ARROW_CHECK_OK(client.Disconnect());
}
@@ -218,7 +218,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
// Create an object with a fixed ObjectID.
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
int64_t data_size = 1000;
@@ -323,7 +323,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
ObjectBuffer object_buffer;
ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
@@ -411,7 +411,7 @@ using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
- ARROW_CHECK_OK(client.Connect("/tmp/plasma", "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client.Connect("/tmp/plasma", ""));
int fd;
ARROW_CHECK_OK(client.Subscribe(&fd));
diff --git a/cpp/src/plasma/client.cc b/cpp/src/plasma/client.cc
index b4ee098..a332686 100644
--- a/cpp/src/plasma/client.cc
+++ b/cpp/src/plasma/client.cc
@@ -703,8 +703,8 @@ bool PlasmaClient::Impl::compute_object_hash_parallel(XXH64_state_t* hash_state,
const int num_threads = kThreadPoolSize;
uint64_t threadhash[num_threads + 1];
const uint64_t data_address = reinterpret_cast<uint64_t>(data);
- const uint64_t num_blocks = nbytes / BLOCK_SIZE;
- const uint64_t chunk_size = (num_blocks / num_threads) * BLOCK_SIZE;
+ const uint64_t num_blocks = nbytes / kBlockSize;
+ const uint64_t chunk_size = (num_blocks / num_threads) * kBlockSize;
const uint64_t right_address = data_address + chunk_size * num_threads;
const uint64_t suffix = (data_address + nbytes) - right_address;
// Now the data layout is | k * num_threads * block_size | suffix | ==
diff --git a/cpp/src/plasma/client.h b/cpp/src/plasma/client.h
index 4221997..4e1ff4a 100644
--- a/cpp/src/plasma/client.h
+++ b/cpp/src/plasma/client.h
@@ -34,7 +34,14 @@ using arrow::Status;
namespace plasma {
-#define PLASMA_DEFAULT_RELEASE_DELAY 64
+ARROW_DEPRECATED("PLASMA_DEFAULT_RELEASE_DELAY is deprecated")
+constexpr int64_t kDeprecatedPlasmaDefaultReleaseDelay = 64;
+#define PLASMA_DEFAULT_RELEASE_DELAY kDeprecatedPlasmaDefaultReleaseDelay;
+
+/// We keep a queue of unreleased objects cached in the client until we start
+/// sending release requests to the store. This is to avoid frequently mapping
+/// and unmapping objects and evicting data from processor caches.
+constexpr int64_t kPlasmaDefaultReleaseDelay = 64;
/// Object buffer data structure.
struct ObjectBuffer {
@@ -64,8 +71,8 @@ class ARROW_EXPORT PlasmaClient {
/// \param num_retries number of attempts to connect to IPC socket, default 50
/// \return The return status.
Status Connect(const std::string& store_socket_name,
- const std::string& manager_socket_name, int release_delay,
- int num_retries = -1);
+ const std::string& manager_socket_name,
+ int release_delay = kPlasmaDefaultReleaseDelay, int num_retries = -1);
/// Create an object in the Plasma Store. Any metadata for this object must be
/// be passed in when the object is created.
diff --git a/cpp/src/plasma/format/plasma.fbs b/cpp/src/plasma/format/plasma.fbs
index 0258cdf..6a58fb0 100644
--- a/cpp/src/plasma/format/plasma.fbs
+++ b/cpp/src/plasma/format/plasma.fbs
@@ -18,8 +18,10 @@
// Plasma protocol specification
enum MessageType:int {
+ // Message that gets send when a client hangs up.
+ PlasmaDisconnectClient = 0,
// Create a new object.
- PlasmaCreateRequest = 1,
+ PlasmaCreateRequest,
PlasmaCreateReply,
PlasmaAbortRequest,
PlasmaAbortReply,
diff --git a/cpp/src/plasma/io.cc b/cpp/src/plasma/io.cc
index 4142bf9..aefd297 100644
--- a/cpp/src/plasma/io.cc
+++ b/cpp/src/plasma/io.cc
@@ -24,12 +24,14 @@
#include "arrow/status.h"
#include "plasma/common.h"
+#include "plasma/plasma_generated.h"
using arrow::Status;
-/* Number of times we try connecting to a socket. */
-#define NUM_CONNECT_ATTEMPTS 50
-#define CONNECT_TIMEOUT_MS 100
+/// Number of times we try connecting to a socket.
+constexpr int64_t kNumConnectAttempts = 50;
+/// Time to wait between connection attempts to a socket.
+constexpr int64_t kConnectTimeoutMs = 100;
namespace plasma {
@@ -38,8 +40,8 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
- /* While we haven't written the whole message, write to the file descriptor,
- * advance the cursor, and decrease the amount left to write. */
+ // While we haven't written the whole message, write to the file descriptor,
+ // advance the cursor, and decrease the amount left to write.
nbytes = write(fd, cursor + offset, bytesleft);
if (nbytes < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) {
@@ -58,7 +60,7 @@ Status WriteBytes(int fd, uint8_t* cursor, size_t length) {
}
Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
- int64_t version = PLASMA_PROTOCOL_VERSION;
+ int64_t version = kPlasmaProtocolVersion;
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)));
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&type), sizeof(type)));
RETURN_NOT_OK(WriteBytes(fd, reinterpret_cast<uint8_t*>(&length), sizeof(length)));
@@ -67,7 +69,7 @@ Status WriteMessage(int fd, int64_t type, int64_t length, uint8_t* bytes) {
Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
ssize_t nbytes = 0;
- /* Termination condition: EOF or read 'length' bytes total. */
+ // Termination condition: EOF or read 'length' bytes total.
size_t bytesleft = length;
size_t offset = 0;
while (bytesleft > 0) {
@@ -91,20 +93,21 @@ Status ReadBytes(int fd, uint8_t* cursor, size_t length) {
Status ReadMessage(int fd, int64_t* type, std::vector<uint8_t>* buffer) {
int64_t version;
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(&version), sizeof(version)),
- *type = DISCONNECT_CLIENT);
- ARROW_CHECK(version == PLASMA_PROTOCOL_VERSION) << "version = " << version;
+ *type = MessageType_PlasmaDisconnectClient);
+ ARROW_CHECK(version == kPlasmaProtocolVersion) << "version = " << version;
RETURN_NOT_OK_ELSE(ReadBytes(fd, reinterpret_cast<uint8_t*>(type), sizeof(*type)),
- *type = DISCONNECT_CLIENT);
+ *type = MessageType_PlasmaDisconnectClient);
int64_t length_temp;
RETURN_NOT_OK_ELSE(
ReadBytes(fd, reinterpret_cast<uint8_t*>(&length_temp), sizeof(length_temp)),
- *type = DISCONNECT_CLIENT);
+ *type = MessageType_PlasmaDisconnectClient);
// The length must be read as an int64_t, but it should be used as a size_t.
size_t length = static_cast<size_t>(length_temp);
if (length > buffer->size()) {
buffer->resize(length);
}
- RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length), *type = DISCONNECT_CLIENT);
+ RETURN_NOT_OK_ELSE(ReadBytes(fd, buffer->data(), length),
+ *type = MessageType_PlasmaDisconnectClient);
return Status::OK();
}
@@ -115,7 +118,7 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
ARROW_LOG(ERROR) << "socket() failed for pathname " << pathname;
return -1;
}
- /* Tell the system to allow the port to be reused. */
+ // Tell the system to allow the port to be reused.
int on = 1;
if (setsockopt(socket_fd, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&on),
sizeof(on)) < 0) {
@@ -150,23 +153,23 @@ int bind_ipc_sock(const std::string& pathname, bool shall_listen) {
Status ConnectIpcSocketRetry(const std::string& pathname, int num_retries,
int64_t timeout, int* fd) {
- /* Pick the default values if the user did not specify. */
+ // Pick the default values if the user did not specify.
if (num_retries < 0) {
- num_retries = NUM_CONNECT_ATTEMPTS;
+ num_retries = kNumConnectAttempts;
}
if (timeout < 0) {
- timeout = CONNECT_TIMEOUT_MS;
+ timeout = kConnectTimeoutMs;
}
*fd = connect_ipc_sock(pathname);
while (*fd < 0 && num_retries > 0) {
ARROW_LOG(ERROR) << "Connection to IPC socket failed for pathname " << pathname
<< ", retrying " << num_retries << " more times";
- /* Sleep for timeout milliseconds. */
+ // Sleep for timeout milliseconds.
usleep(static_cast<int>(timeout * 1000));
*fd = connect_ipc_sock(pathname);
--num_retries;
}
- /* If we could not connect to the socket, exit. */
+ // If we could not connect to the socket, exit.
if (*fd == -1) {
std::stringstream ss;
ss << "Could not connect to socket " << pathname;
@@ -215,7 +218,7 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
int64_t size;
Status s = ReadBytes(sock, reinterpret_cast<uint8_t*>(&size), sizeof(int64_t));
if (!s.ok()) {
- /* The other side has closed the socket. */
+ // The other side has closed the socket.
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
close(sock);
return NULL;
@@ -223,7 +226,7 @@ std::unique_ptr<uint8_t[]> read_message_async(int sock) {
auto message = std::unique_ptr<uint8_t[]>(new uint8_t[size]);
s = ReadBytes(sock, message.get(), size);
if (!s.ok()) {
- /* The other side has closed the socket. */
+ // The other side has closed the socket.
ARROW_LOG(DEBUG) << "Socket has been closed, or some other error has occurred.";
close(sock);
return NULL;
diff --git a/cpp/src/plasma/io.h b/cpp/src/plasma/io.h
index 8869c9b..f0d57c0 100644
--- a/cpp/src/plasma/io.h
+++ b/cpp/src/plasma/io.h
@@ -30,13 +30,12 @@
#include "arrow/status.h"
#include "plasma/compat.h"
+namespace plasma {
+
// TODO(pcm): Replace our own custom message header (message type,
// message length, plasma protocol verion) with one that is serialized
// using flatbuffers.
-#define PLASMA_PROTOCOL_VERSION 0x0000000000000000
-#define DISCONNECT_CLIENT 0
-
-namespace plasma {
+constexpr int64_t kPlasmaProtocolVersion = 0x0000000000000000;
using arrow::Status;
diff --git a/cpp/src/plasma/plasma.h b/cpp/src/plasma/plasma.h
index 4b4064c..a1d6e99 100644
--- a/cpp/src/plasma/plasma.h
+++ b/cpp/src/plasma/plasma.h
@@ -36,6 +36,7 @@
#include "arrow/status.h"
#include "arrow/util/logging.h"
+#include "arrow/util/macros.h"
#include "plasma/common.h"
#include "plasma/common_generated.h"
@@ -65,7 +66,7 @@ namespace plasma {
} while (0);
/// Allocation granularity used in plasma for object allocation.
-#define BLOCK_SIZE 64
+constexpr int64_t kBlockSize = 64;
struct Client;
diff --git a/cpp/src/plasma/store.cc b/cpp/src/plasma/store.cc
index 2caa3cb..c06ad6a 100644
--- a/cpp/src/plasma/store.cc
+++ b/cpp/src/plasma/store.cc
@@ -170,7 +170,7 @@ int PlasmaStore::create_object(const ObjectID& object_id, int64_t data_size,
// 64-byte aligned, but in practice it often will be.
if (device_num == 0) {
pointer =
- reinterpret_cast<uint8_t*>(dlmemalign(BLOCK_SIZE, data_size + metadata_size));
+ reinterpret_cast<uint8_t*>(dlmemalign(kBlockSize, data_size + metadata_size));
if (pointer == NULL) {
// Tell the eviction policy how much space we need to create this object.
std::vector<ObjectID> objects_to_evict;
@@ -741,7 +741,7 @@ Status PlasmaStore::process_message(Client* client) {
HANDLE_SIGPIPE(SendConnectReply(client->fd, store_info_.memory_capacity),
client->fd);
} break;
- case DISCONNECT_CLIENT:
+ case MessageType_PlasmaDisconnectClient:
ARROW_LOG(DEBUG) << "Disconnecting client on fd " << client->fd;
disconnect_client(client->fd);
break;
@@ -768,7 +768,7 @@ class PlasmaStoreRunner {
// achieve that by mallocing and freeing a single large amount of space.
// that maximum allowed size up front.
if (use_one_memory_mapped_file) {
- void* pointer = plasma::dlmemalign(BLOCK_SIZE, system_memory);
+ void* pointer = plasma::dlmemalign(kBlockSize, system_memory);
ARROW_CHECK(pointer != NULL);
plasma::dlfree(pointer);
}
diff --git a/cpp/src/plasma/test/client_tests.cc b/cpp/src/plasma/test/client_tests.cc
index dad7688..53743ca 100644
--- a/cpp/src/plasma/test/client_tests.cc
+++ b/cpp/src/plasma/test/client_tests.cc
@@ -59,10 +59,8 @@ class TestPlasmaStore : public ::testing::Test {
"/plasma_store -m 1000000000 -s /tmp/store" +
store_index + " 1> /dev/null 2> /dev/null &";
system(plasma_command.c_str());
- ARROW_CHECK_OK(
- client_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
- ARROW_CHECK_OK(
- client2_.Connect("/tmp/store" + store_index, "", PLASMA_DEFAULT_RELEASE_DELAY));
+ ARROW_CHECK_OK(client_.Connect("/tmp/store" + store_index, ""));
+ ARROW_CHECK_OK(client2_.Connect("/tmp/store" + store_index, ""));
}
virtual void TearDown() {
ARROW_CHECK_OK(client_.Disconnect());
--
To stop receiving notification emails like this one, please contact
uwe@apache.org.